Source code for comet.snmr.modelling.snmrModelling

"""
Part of comet/snmr/modelling

Modelling classes for core magnetic resonance (1D, 2D)
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

# general modules to import according to standards
# import pygimli as pg
import numpy as np
import pygimli as pg
from comet.snmr import kernel as k
from comet.snmr.misc import getDataPhase
from comet.pyhed import log
from comet.pyhed.misc import NumpyMatrix, project1DModel


[docs]class SNMRModelling(pg.core.ModellingBase): """Modelling class for surface nuclear magnetic resonance (SNMR). The class is based on the ModellingBase class of pygimli and therefore contains a various amount of parameters and functions as well as some protected members to ensure a generalized interface suitable for the pygimli inversion engine. For further details about the spezifications of the modelling base, be referred to the pygimli API available from the official project website www.pygimli.org. """ def __init__(self, survey, kernel, fid=0, dtype='complex', mesh=None, num_cpu=12, update_kernel=False): """ Parameters ---------- kernel_mat: string, class instance or kernel matrix [None] Defines the kernel calculator. A given string will trigger to load a previously saved kernel (.npy for matrix or .npz for previously saved kernel class instance). An already initialized instance of the kernel class can be given as well (kernel matrix has to be calculated, the fop class will not trigger this by itself). gates: array or None [None] Array containing the gating times for calculating the forward response to the relaxation times. mesh: pg.mesh [None] Parameter discretization. Can be created automatically for 1D. Has to be set for 2D. dimension: integer [1] Defines the dimension of the calcualted kernel, and therefore the dimension in wich fop and jacobain will be calcualted. Possible are 1D [1] or 2D [2]. vector: array_like or None [None] Second part of the model space discretisation, if not already given along with the kernel_mat argument. Raises an error if neither the 'kernel_mat' nor the vector contains a discretisation. In case of 1D, the Z-vector is given, in case of 2D, the Y-vector. The Kernel class provides methods for atomatic creation of suited vectors. complex_: bool [True] If true, the forward response and jacobian will suits all conditions made by a complex or rotated amplitude inversion algorithm, otherwise the fop and jacobian will suits the conditions of an amplitude inversion algorithm. verbose: bool [False] Mostly for debugging purposes. Selecting True will trigger the console print functions of the class as well as for all subclasses. For an overview over the included classes and parameters it is recommended to use a print(SNMRModelling) call manually instead of setting verbose to True. Pseudo Usage ------------ >>> f = SNMRModelling(kernel) >>> resp = f.response('suitable model') >>> jabobian = f.createJacobian('suitable model') >>> print(f) """ super(SNMRModelling, self).__init__(False) # verbose # call init of modelling base self.para_ = 2 # water content, relaxation times self.dtype = dtype self.setSurvey(survey, fid=fid) self.setKernel(kernel) # initialize forward parameters if mesh is not None: self.setMesh(mesh) self.model = None self.num_cpu = num_cpu self.update_kernel = update_kernel def __repr__(self): out = 'SNMR forward operator, smooth' return out def __str__(self): out = 'SNMR forward operator, smooth\n' out += 'Kernel:\n{!r}'.format(self.kernel) return out @property def kshape(self): """ Kernel shape: (data, model) == (pulses, model) """ return self.kernel.shape @property def jshape(self): """ Jacobian shape: (data, model) == (pulses * gates, model * number of parameters) """ j_ddim = self.kshape[0] * len(self.fid.gates) j_mdim = self.kshape[1] * self.para_ return (j_ddim, j_mdim) @property def dimension(self): dim = self.kernel.dimension return dim @property def fid(self): """ Reference to sounding (FID) class instance in survey.""" return self.survey.fids[self.fid_index] @property def iscomplex(self): iscomplex = self.dtype == 'complex' return iscomplex @property def vector(self): if self.dimension == 1: vector = self.kernel.zvec elif self.dimension == 2: vector = self.kernel.yvec else: vector = None return vector
[docs] def setSurvey(self, survey, fid=0): self.survey = survey self.fid_index = fid
[docs] def setKernel(self, kernel): if not isinstance(kernel, k.Kernel): raise Exception('Please load kernel class in modelling class, ' 'not in forward operator.') else: self.kernel = kernel if self.dimension == 1: if self.kernel.zvec is None: self.kernel.create1DKernelMesh() mesh_1d = pg.createMesh1D(nCells=len(self.vector) - 1, nProperties=self.para_) self.setMesh(mesh_1d) # function from modelling base
[docs] def setModel(self, model): ''' Parameters ---------- model: array_like Array that contains three array_like objects. First the thickness of the different layers (number of layers - 1). The second and third array contains the water contents and relaxation times of each layer. Example ------- >>> FOP = SNMRModelling('a valid kernel class') >>> model = [[5., 10.], # thickness [m] >>> [0.1, 0.25, 0.4], # water content [1] >>> [0.1, 0.1, 1.]] # relaxation times [s] >>> FOP.setModelVec(model) ''' log.debug('snmrModelling: setModel()') if isinstance(model[0], float): # case 1: flat array, 1D + 2D self.model = np.array(model) elif isinstance(model[0], (tuple, list, np.ndarray)): # case 2: block model given, 1D only if self.dimension == 1: model_water = project1DModel(model[0], model[1], self.vector) model_relax = project1DModel(model[0], model[2], self.vector) numZ = len(model_water) self.model = np.zeros(2*numZ) self.model[:numZ] = model_water self.model[numZ:] = model_relax else: raise Exception('Warning: 2D forward operator cannot handle ' ' block model: {}'.format(model)) else: raise Exception('Expect 1D vector or block model.' 'Got object {} of shape {}.' .format(model, np.shape(model))) if len(self.model) != self.jshape[1]: # an error with some history, this stays! raise Exception('Dimension mismatch for given model vec {} and ' 'jacobian model dimension ({},).' .format(np.shape(self.model), self.jshape[1]))
[docs] def calculateKernel(self, matrix=False, interpolate=False, forceNew=False, **kwargs): if self.kernel.updatable: self.kernel.calculate( matrix=matrix, interpolate=interpolate, forceNew=False, # b field num_cpu=kwargs.pop('num_cpu', self.num_cpu), **kwargs) else: raise Exception('Kernel class not fit for recalculation.' '{}'.format(self.kernel))
[docs] def updateDataPhase(self): """ Sets data phase for complex inversion. If no model is given the starting model is used. """ data_calc_r, data_calc_i = np.split(self.response(self.model), 2) data_calc = data_calc_r + 1j * data_calc_i data_obs = self.fid.data_gated # without phase phi = getDataPhase(data_obs, data_calc) self.fid.setDataPhase(phi)
[docs] def createJacobian(self, model=None, **kwargs): """ Caculate the Jacobian Matrix of a NMR Kernel, with or without relaxation times included (model dependancy for this). kwargs are redirected to kernelClass.calculate() Example ------- >>> # complex jacobian without relaxation time >>> FOP = MRModelling('a valid kernel class') >>> FOP.createJacobian() # sets FOP.jacobian """ log.log(16, 'snmrModelling: create Jacobian.') if self.update_kernel or self.kernel.getKernel() is None: log.info('snmrModelling: recalculation of Kernel') self.calculateKernel(**kwargs) if model is None: if self.model is None: raise Exception( 'No model given. Use method *setModel* ' 'or give a model to *createJacobian* or *response*.') else: self.setModel(model) if self.dtype == 'complex': # update data phase for complex inversion before model update is # calculated log.debug('FIXME: snmrModelling: Disabled updateDataPhase!') # self.updateDataPhase() # model = [w_1, w_2, ..., w_C, T_1, T_2, ..., T_C] # model contains watercontents and t2* values for all layers w = self.model[:self.kshape[1]] T2 = self.model[self.kshape[1]:] # T2expM texp = np.exp(np.einsum('t,m-> mt', -self.fid.gates, 1./T2)) # dt2M t_T2 = np.tile((self.fid.gates[:, np.newaxis]/(T2**2)), (self.kshape[0], 1)) # kernel and e function # gM * T2expM Ghelp = np.einsum('qm,mt->qtm', self.kernel.getKernel(), texp) Ghelp = Ghelp.reshape(-1, self.kshape[1]) # t 20, q 24, m 96 -> q+t 480: [20, 20, 20,...], m 96 if self.iscomplex: log.debug('complex Jacobian') G = Ghelp H = G * t_T2 * w else: log.debug('amplitude Jacobian') # from MRSMatlab/functions/MRSInversionQT/QTIMonoJacobian.m # G G = self.amplitudeJacobian(Ghelp, w) # G * wM * dt2M Hhelp = G * t_T2 * w H = self.amplitudeJacobian(Hhelp, w) self.jmat = np.column_stack([G, H]) # put jacobian to matrix wrapper for pygimli compatibility self.J = NumpyMatrix(self.jmat) self.setJacobian(self.J) # print('jmat', np.shape(self.jmat), self.jmat[0][0]) log.debug('Jacobian {} x {}'.format(self.J.rows(), self.J.cols())) return self.J
[docs] @staticmethod def amplitudeJacobian(Mcomplex, model): # not rotated Amplitude, THIS IS DONE BY FID Class dd = Mcomplex.dot(model) absdd = 1. / np.absolute(dd) # (data, model) x (data) X (data) = (data, model) rr = np.einsum('ij,i,i->ij', Mcomplex.real, dd.real, absdd) ii = np.einsum('ij,i,i->ij', Mcomplex.imag, dd.imag, absdd) Mamplitude = rr + ii return Mamplitude
[docs] def forward(self, model, **kwargs): """ Forward response of the kernel to a specific destribution of watercontent or relaxation times. 1D -- model.shape: array.shape = 2 or 3, numLayers (watercontent only: 2, 3 with relaxation times), first entry = thickness Example 1D ---------- >>> thickness = [1, 5, 10] >>> # first layer 0...1 m >>> # second layer 1...6 m >>> # third layer 6...16 m >>> # after that homogeneous halfspace >>> watercontent = [0.2, 0.3, 0.1, 0.2] # 1 == 100% >>> # one entry more than thickness, last entry for halfspace >>> model = np.array((thickness, >>> watercontent, >>> [100, 200, 14, 100])) # relaxation times >>> measurement = mrs.response(model) """ self.setModel(model) if self.kernel.getKernel() is None: self.calculateKernel(**kwargs) wc = self.model[:self.kshape[1]] # water content t2 = self.model[self.kshape[1]:] # relaxation times et2 = np.exp(-self.fid.gates[:, np.newaxis]/t2) # 'qm,m,tm->qt' forward = np.einsum('ij,j,kj->ik', self.kernel.getKernel(), wc, et2) # log.debug('forward: {}'.format(len(forward.flatten()))) return forward.flatten() # complex array (24, 20) to complex (480,)
[docs] def response(self, model): """ Calculates the forward response of a SNMR measurement, and returns an 1D numpy array containing the real and imaginary parts of the response. One Voltage value for each pulse moment q and time gate g. :: data type: complex ([real(V_11), ..., real(V_1Q), real(V_21), ..., real(V_2Q), ..., real(V_N1), ..., real(V_NQ), imag(V_11), ..., ..., ... , ..., imag(V_NQ)]), shape: (2*N, Q) data type: not complex ([abs(V_11), ..., abs(V_1Q), abs(V_21), ..., abs(V_2Q), ..., abs(V_N1), ..., abs(V_NQ)]), shape: (N, Q) """ rspns = self.forward(model) # complex response (480,) if self.iscomplex: # (480,) complex to (960,) real return np.append(rspns.real, rspns.imag) else: # (480,) complex to (480,) real (absolute) return np.absolute(rspns) # rotated Amps handled in MRS manager
[docs]class MRS1dBlockQTModelling(pg.core.ModellingBase): """ MRS1dBlockQTModelling - pygimli modelling class for block-mono QT inversion f=MRS1dBlockQTModelling(lay, KR, KI, zvec, t, verbose = False ) """ def __init__(self, survey, fid=0, nlay=3, dtype='complex', kernel=None): """constructor with number of layers, kernel, z and t vectors""" mesh = pg.core.createMesh1DBlock(nlay, 2) # thk, wc, T2* pg.core.ModellingBase.__init__(self, mesh) self.kernel = kernel self.survey = survey self.fid_index = fid self.nlay = nlay self.dtype = dtype self.setThreadCount(1) def __repr__(self): out = 'SNMR forward operator, block' return out @property def fid(self): fid = self.survey.fids[self.fid_index] return fid @property def iscomplex(self): com = self.dtype == 'complex' return com
[docs] def forward(self, par, verbose=False, num_cpu=12): """yield model response cube as vector""" # raise Exception('this is wrong!') if self.fid.gates is None: raise Exception('No gates to calcualte with') if self.kernel.K is None: if self.kernel.updatable: self.kernel.calculate(num_cpu=num_cpu, slices=True) else: raise Exception('Cannot calculate Kernel matrix.') if self.kernel.zvec is None: raise Exception('No z discretization found.') if verbose: print('MRS1dBlockQTModelling: forward()') nl = self.nlay if len(par) == nl and np.shape(par[0]) != (): thk = par[0] wc = par[1] t2 = par[2] # print(thk, wc, t2) else: thk = par[0:nl-1] # (0, nl - 1) wc = par[nl-1:2*nl-1] # (nl - 1, 2 * nl - 1) t2 = par[2*nl-1:3*nl-1] # par(2 * nl - 1, 3 * nl - 1) zthk = np.cumsum(thk) zv = np.abs(self.kernel.getZVector(reduced=False)) lzv = len(zv) izvec = np.zeros(nl + 1, np.int32) rzvec = np.zeros(nl + 1) for i in range(nl - 1): ii = (zv < zthk[i]).argmin() izvec[i + 1] = ii if ii <= len(zv): rzvec[i + 1] = (zthk[i] - zv[ii - 1]) / (zv[ii] - zv[ii - 1]) izvec[-1] = lzv - 1 A = np.zeros((len(self.fid.pulses), len(self.fid.gates)), dtype=complex) for i in range(nl): wcvec = np.zeros(lzv - 1) wcvec[izvec[i]:izvec[i + 1]] = wc[i] if izvec[i + 1] < lzv: wcvec[izvec[i + 1] - 1] = wc[i] * rzvec[i + 1] if izvec[i] > 0: wcvec[izvec[i] - 1] = wc[i] * (1 - rzvec[i]) # print('wcvec', wcvec) amps = np.dot(self.kernel.getKernel(reduced=False), wcvec) for ii, a in enumerate(A): a += np.exp(-self.fid.gates / t2[i]) * amps[ii] return A # pulses * gates as complex valued array
[docs] def response(self, par): fwd = self.forward(par) if not self.iscomplex: return np.abs(fwd).ravel() # formerly pg as vector else: return np.append(fwd.real, fwd.imag) # formerly pg as vector
[docs]class SNMRJointModelling(pg.core.ModellingBase): """ Joint modelling operator for multiple transmitter receiver combinations """ def __init__(self, mrt=None, verbose=False): super(SNMRJointModelling, self).__init__(verbose) self.fop_list = [] self._jacobian = pg.core.BlockMatrix() self.setJacobian(self._jacobian) # after that single createJacobians is sufficient (reference) self.n_data = 0 self.mrt = mrt
[docs] def addFOP(self, *fops): if isinstance(fops[0], (list, tuple)): fops = fops[0] for fop in fops: self.fop_list.append(fop)
[docs] def setFOPs(self, fops): self.fop_list = [] self._jacobian.clear() self.n_data = 0 for fop in fops: self.addFOP(fop)
[docs] def response(self, model): resp = pg.Vector() for fop in self.fop_list: resp = pg.cat(resp, fop.response(model)) return resp
[docs] def forward(self, model): for i, fop in enumerate(self.fop_list): if i == 0: fwrd = fop.forward(model) else: fwrd = np.append(fwrd, fop.forward(model)) return fwrd
[docs] def createJacobian(self, model): log.info('SNMRJointModelling: ' 'createJacobian() ({} joined fops)' .format(len(self.fop_list))) self._jacobian = pg.core.BlockMatrix() self.n_data = 0 for fop in self.fop_list: fop.createJacobian(model) bla = self._jacobian.addMatrix(fop.jacobian()) self._jacobian.addMatrixEntry(bla, int(self.n_data), 0) self.n_data += fop.jacobian().rows() self.setJacobian(self._jacobian) log.debug('updateData deactivated for now (fix is in place)')
# if self.mrt is not None: # self.mrt.updateData() if __name__ == "__main__": pass # The End