"""
Part of comet/snmr/modelling
Modelling classes for core magnetic resonance (1D, 2D)
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# general modules to import according to standards
# import pygimli as pg
import numpy as np
import pygimli as pg
from comet.snmr import kernel as k
from comet.snmr.misc import getDataPhase
from comet.pyhed import log
from comet.pyhed.misc import NumpyMatrix, project1DModel
[docs]class SNMRModelling(pg.core.ModellingBase):
"""Modelling class for surface nuclear magnetic resonance (SNMR).
The class is based on the ModellingBase
class of pygimli and therefore contains a various amount of parameters and
functions as well as some protected members to ensure a generalized
interface suitable for the pygimli inversion engine.
For further details about the spezifications of the modelling base, be
referred to the pygimli API available from the official project website
www.pygimli.org.
"""
def __init__(self,
survey,
kernel,
fid=0,
dtype='complex',
mesh=None,
num_cpu=12,
update_kernel=False):
"""
Parameters
----------
kernel_mat: string, class instance or kernel matrix [None]
Defines the kernel calculator. A given string will trigger
to load a previously saved kernel (.npy for matrix or .npz for
previously saved kernel class instance). An already initialized
instance of the kernel class can be given as well (kernel matrix
has to be calculated, the fop class will not trigger this by
itself).
gates: array or None [None]
Array containing the gating times for calculating the forward
response to the relaxation times.
mesh: pg.mesh [None]
Parameter discretization. Can be created automatically for 1D.
Has to be set for 2D.
dimension: integer [1]
Defines the dimension of the calcualted kernel, and therefore the
dimension in wich fop and jacobain will be calcualted.
Possible are 1D [1] or 2D [2].
vector: array_like or None [None]
Second part of the model space discretisation, if not already
given along with the kernel_mat argument. Raises an error if
neither the 'kernel_mat' nor the vector contains a discretisation.
In case of 1D, the Z-vector is given, in case of 2D, the Y-vector.
The Kernel class provides methods for atomatic creation of suited
vectors.
complex_: bool [True]
If true, the forward response and jacobian will suits all
conditions made by a complex or rotated amplitude inversion
algorithm, otherwise the fop and jacobian will suits the
conditions of an amplitude inversion algorithm.
verbose: bool [False]
Mostly for debugging purposes. Selecting True will trigger the
console print functions of the class as well as for all subclasses.
For an overview over the included classes and parameters it is
recommended to use a print(SNMRModelling) call manually instead of
setting verbose to True.
Pseudo Usage
------------
>>> f = SNMRModelling(kernel)
>>> resp = f.response('suitable model')
>>> jabobian = f.createJacobian('suitable model')
>>> print(f)
"""
super(SNMRModelling, self).__init__(False) # verbose
# call init of modelling base
self.para_ = 2 # water content, relaxation times
self.dtype = dtype
self.setSurvey(survey, fid=fid)
self.setKernel(kernel)
# initialize forward parameters
if mesh is not None:
self.setMesh(mesh)
self.model = None
self.num_cpu = num_cpu
self.update_kernel = update_kernel
def __repr__(self):
out = 'SNMR forward operator, smooth'
return out
def __str__(self):
out = 'SNMR forward operator, smooth\n'
out += 'Kernel:\n{!r}'.format(self.kernel)
return out
@property
def kshape(self):
""" Kernel shape: (data, model) == (pulses, model) """
return self.kernel.shape
@property
def jshape(self):
""" Jacobian shape: (data, model)
== (pulses * gates, model * number of parameters)
"""
j_ddim = self.kshape[0] * len(self.fid.gates)
j_mdim = self.kshape[1] * self.para_
return (j_ddim, j_mdim)
@property
def dimension(self):
dim = self.kernel.dimension
return dim
@property
def fid(self):
""" Reference to sounding (FID) class instance in survey."""
return self.survey.fids[self.fid_index]
@property
def iscomplex(self):
iscomplex = self.dtype == 'complex'
return iscomplex
@property
def vector(self):
if self.dimension == 1:
vector = self.kernel.zvec
elif self.dimension == 2:
vector = self.kernel.yvec
else:
vector = None
return vector
[docs] def setSurvey(self, survey, fid=0):
self.survey = survey
self.fid_index = fid
[docs] def setKernel(self, kernel):
if not isinstance(kernel, k.Kernel):
raise Exception('Please load kernel class in modelling class, '
'not in forward operator.')
else:
self.kernel = kernel
if self.dimension == 1:
if self.kernel.zvec is None:
self.kernel.create1DKernelMesh()
mesh_1d = pg.createMesh1D(nCells=len(self.vector) - 1,
nProperties=self.para_)
self.setMesh(mesh_1d) # function from modelling base
[docs] def setModel(self, model):
'''
Parameters
----------
model: array_like
Array that contains three array_like objects. First the thickness
of the different layers (number of layers - 1). The second and
third array contains the water contents and relaxation times of
each layer.
Example
-------
>>> FOP = SNMRModelling('a valid kernel class')
>>> model = [[5., 10.], # thickness [m]
>>> [0.1, 0.25, 0.4], # water content [1]
>>> [0.1, 0.1, 1.]] # relaxation times [s]
>>> FOP.setModelVec(model)
'''
log.debug('snmrModelling: setModel()')
if isinstance(model[0], float):
# case 1: flat array, 1D + 2D
self.model = np.array(model)
elif isinstance(model[0], (tuple, list, np.ndarray)):
# case 2: block model given, 1D only
if self.dimension == 1:
model_water = project1DModel(model[0], model[1],
self.vector)
model_relax = project1DModel(model[0], model[2],
self.vector)
numZ = len(model_water)
self.model = np.zeros(2*numZ)
self.model[:numZ] = model_water
self.model[numZ:] = model_relax
else:
raise Exception('Warning: 2D forward operator cannot handle '
' block model: {}'.format(model))
else:
raise Exception('Expect 1D vector or block model.'
'Got object {} of shape {}.'
.format(model, np.shape(model)))
if len(self.model) != self.jshape[1]:
# an error with some history, this stays!
raise Exception('Dimension mismatch for given model vec {} and '
'jacobian model dimension ({},).'
.format(np.shape(self.model), self.jshape[1]))
[docs] def calculateKernel(self, matrix=False, interpolate=False, forceNew=False,
**kwargs):
if self.kernel.updatable:
self.kernel.calculate(
matrix=matrix, interpolate=interpolate,
forceNew=False, # b field
num_cpu=kwargs.pop('num_cpu', self.num_cpu),
**kwargs)
else:
raise Exception('Kernel class not fit for recalculation.'
'{}'.format(self.kernel))
[docs] def updateDataPhase(self):
"""
Sets data phase for complex inversion. If no model is given the
starting model is used.
"""
data_calc_r, data_calc_i = np.split(self.response(self.model), 2)
data_calc = data_calc_r + 1j * data_calc_i
data_obs = self.fid.data_gated # without phase
phi = getDataPhase(data_obs, data_calc)
self.fid.setDataPhase(phi)
[docs] def createJacobian(self, model=None, **kwargs):
"""
Caculate the Jacobian Matrix of a NMR Kernel, with or without
relaxation times included (model dependancy for this).
kwargs are redirected to kernelClass.calculate()
Example
-------
>>> # complex jacobian without relaxation time
>>> FOP = MRModelling('a valid kernel class')
>>> FOP.createJacobian() # sets FOP.jacobian
"""
log.log(16, 'snmrModelling: create Jacobian.')
if self.update_kernel or self.kernel.getKernel() is None:
log.info('snmrModelling: recalculation of Kernel')
self.calculateKernel(**kwargs)
if model is None:
if self.model is None:
raise Exception(
'No model given. Use method *setModel* '
'or give a model to *createJacobian* or *response*.')
else:
self.setModel(model)
if self.dtype == 'complex':
# update data phase for complex inversion before model update is
# calculated
log.debug('FIXME: snmrModelling: Disabled updateDataPhase!')
# self.updateDataPhase()
# model = [w_1, w_2, ..., w_C, T_1, T_2, ..., T_C]
# model contains watercontents and t2* values for all layers
w = self.model[:self.kshape[1]]
T2 = self.model[self.kshape[1]:]
# T2expM
texp = np.exp(np.einsum('t,m-> mt', -self.fid.gates, 1./T2))
# dt2M
t_T2 = np.tile((self.fid.gates[:, np.newaxis]/(T2**2)),
(self.kshape[0], 1))
# kernel and e function
# gM * T2expM
Ghelp = np.einsum('qm,mt->qtm', self.kernel.getKernel(), texp)
Ghelp = Ghelp.reshape(-1, self.kshape[1])
# t 20, q 24, m 96 -> q+t 480: [20, 20, 20,...], m 96
if self.iscomplex:
log.debug('complex Jacobian')
G = Ghelp
H = G * t_T2 * w
else:
log.debug('amplitude Jacobian')
# from MRSMatlab/functions/MRSInversionQT/QTIMonoJacobian.m
# G
G = self.amplitudeJacobian(Ghelp, w)
# G * wM * dt2M
Hhelp = G * t_T2 * w
H = self.amplitudeJacobian(Hhelp, w)
self.jmat = np.column_stack([G, H])
# put jacobian to matrix wrapper for pygimli compatibility
self.J = NumpyMatrix(self.jmat)
self.setJacobian(self.J)
# print('jmat', np.shape(self.jmat), self.jmat[0][0])
log.debug('Jacobian {} x {}'.format(self.J.rows(), self.J.cols()))
return self.J
[docs] @staticmethod
def amplitudeJacobian(Mcomplex, model):
# not rotated Amplitude, THIS IS DONE BY FID Class
dd = Mcomplex.dot(model)
absdd = 1. / np.absolute(dd)
# (data, model) x (data) X (data) = (data, model)
rr = np.einsum('ij,i,i->ij', Mcomplex.real, dd.real, absdd)
ii = np.einsum('ij,i,i->ij', Mcomplex.imag, dd.imag, absdd)
Mamplitude = rr + ii
return Mamplitude
[docs] def forward(self, model, **kwargs):
"""
Forward response of the kernel to a specific destribution of
watercontent or relaxation times.
1D
--
model.shape:
array.shape = 2 or 3, numLayers (watercontent only: 2, 3 with
relaxation times), first entry = thickness
Example 1D
----------
>>> thickness = [1, 5, 10]
>>> # first layer 0...1 m
>>> # second layer 1...6 m
>>> # third layer 6...16 m
>>> # after that homogeneous halfspace
>>> watercontent = [0.2, 0.3, 0.1, 0.2] # 1 == 100%
>>> # one entry more than thickness, last entry for halfspace
>>> model = np.array((thickness,
>>> watercontent,
>>> [100, 200, 14, 100])) # relaxation times
>>> measurement = mrs.response(model)
"""
self.setModel(model)
if self.kernel.getKernel() is None:
self.calculateKernel(**kwargs)
wc = self.model[:self.kshape[1]] # water content
t2 = self.model[self.kshape[1]:] # relaxation times
et2 = np.exp(-self.fid.gates[:, np.newaxis]/t2)
# 'qm,m,tm->qt'
forward = np.einsum('ij,j,kj->ik', self.kernel.getKernel(), wc, et2)
# log.debug('forward: {}'.format(len(forward.flatten())))
return forward.flatten() # complex array (24, 20) to complex (480,)
[docs] def response(self, model):
"""
Calculates the forward response of a SNMR measurement, and returns an
1D numpy array containing the real and imaginary parts
of the response. One Voltage value for each pulse moment q and time
gate g.
::
data type: complex
([real(V_11), ..., real(V_1Q),
real(V_21), ..., real(V_2Q),
...,
real(V_N1), ..., real(V_NQ),
imag(V_11), ..., ...,
... , ..., imag(V_NQ)]), shape: (2*N, Q)
data type: not complex
([abs(V_11), ..., abs(V_1Q),
abs(V_21), ..., abs(V_2Q),
...,
abs(V_N1), ..., abs(V_NQ)]), shape: (N, Q)
"""
rspns = self.forward(model) # complex response (480,)
if self.iscomplex:
# (480,) complex to (960,) real
return np.append(rspns.real, rspns.imag)
else:
# (480,) complex to (480,) real (absolute)
return np.absolute(rspns) # rotated Amps handled in MRS manager
[docs]class MRS1dBlockQTModelling(pg.core.ModellingBase):
"""
MRS1dBlockQTModelling - pygimli modelling class for block-mono QT inversion
f=MRS1dBlockQTModelling(lay, KR, KI, zvec, t, verbose = False )
"""
def __init__(self, survey, fid=0, nlay=3, dtype='complex', kernel=None):
"""constructor with number of layers, kernel, z and t vectors"""
mesh = pg.core.createMesh1DBlock(nlay, 2) # thk, wc, T2*
pg.core.ModellingBase.__init__(self, mesh)
self.kernel = kernel
self.survey = survey
self.fid_index = fid
self.nlay = nlay
self.dtype = dtype
self.setThreadCount(1)
def __repr__(self):
out = 'SNMR forward operator, block'
return out
@property
def fid(self):
fid = self.survey.fids[self.fid_index]
return fid
@property
def iscomplex(self):
com = self.dtype == 'complex'
return com
[docs] def forward(self, par, verbose=False, num_cpu=12):
"""yield model response cube as vector"""
# raise Exception('this is wrong!')
if self.fid.gates is None:
raise Exception('No gates to calcualte with')
if self.kernel.K is None:
if self.kernel.updatable:
self.kernel.calculate(num_cpu=num_cpu, slices=True)
else:
raise Exception('Cannot calculate Kernel matrix.')
if self.kernel.zvec is None:
raise Exception('No z discretization found.')
if verbose:
print('MRS1dBlockQTModelling: forward()')
nl = self.nlay
if len(par) == nl and np.shape(par[0]) != ():
thk = par[0]
wc = par[1]
t2 = par[2]
# print(thk, wc, t2)
else:
thk = par[0:nl-1] # (0, nl - 1)
wc = par[nl-1:2*nl-1] # (nl - 1, 2 * nl - 1)
t2 = par[2*nl-1:3*nl-1] # par(2 * nl - 1, 3 * nl - 1)
zthk = np.cumsum(thk)
zv = np.abs(self.kernel.getZVector(reduced=False))
lzv = len(zv)
izvec = np.zeros(nl + 1, np.int32)
rzvec = np.zeros(nl + 1)
for i in range(nl - 1):
ii = (zv < zthk[i]).argmin()
izvec[i + 1] = ii
if ii <= len(zv):
rzvec[i + 1] = (zthk[i] - zv[ii - 1]) / (zv[ii] - zv[ii - 1])
izvec[-1] = lzv - 1
A = np.zeros((len(self.fid.pulses), len(self.fid.gates)),
dtype=complex)
for i in range(nl):
wcvec = np.zeros(lzv - 1)
wcvec[izvec[i]:izvec[i + 1]] = wc[i]
if izvec[i + 1] < lzv:
wcvec[izvec[i + 1] - 1] = wc[i] * rzvec[i + 1]
if izvec[i] > 0:
wcvec[izvec[i] - 1] = wc[i] * (1 - rzvec[i])
# print('wcvec', wcvec)
amps = np.dot(self.kernel.getKernel(reduced=False), wcvec)
for ii, a in enumerate(A):
a += np.exp(-self.fid.gates / t2[i]) * amps[ii]
return A # pulses * gates as complex valued array
[docs] def response(self, par):
fwd = self.forward(par)
if not self.iscomplex:
return np.abs(fwd).ravel() # formerly pg as vector
else:
return np.append(fwd.real, fwd.imag) # formerly pg as vector
[docs]class SNMRJointModelling(pg.core.ModellingBase):
""" Joint modelling operator for multiple transmitter receiver combinations
"""
def __init__(self, mrt=None, verbose=False):
super(SNMRJointModelling, self).__init__(verbose)
self.fop_list = []
self._jacobian = pg.core.BlockMatrix()
self.setJacobian(self._jacobian)
# after that single createJacobians is sufficient (reference)
self.n_data = 0
self.mrt = mrt
[docs] def addFOP(self, *fops):
if isinstance(fops[0], (list, tuple)):
fops = fops[0]
for fop in fops:
self.fop_list.append(fop)
[docs] def setFOPs(self, fops):
self.fop_list = []
self._jacobian.clear()
self.n_data = 0
for fop in fops:
self.addFOP(fop)
[docs] def response(self, model):
resp = pg.Vector()
for fop in self.fop_list:
resp = pg.cat(resp, fop.response(model))
return resp
[docs] def forward(self, model):
for i, fop in enumerate(self.fop_list):
if i == 0:
fwrd = fop.forward(model)
else:
fwrd = np.append(fwrd, fop.forward(model))
return fwrd
[docs] def createJacobian(self, model):
log.info('SNMRJointModelling: '
'createJacobian() ({} joined fops)'
.format(len(self.fop_list)))
self._jacobian = pg.core.BlockMatrix()
self.n_data = 0
for fop in self.fop_list:
fop.createJacobian(model)
bla = self._jacobian.addMatrix(fop.jacobian())
self._jacobian.addMatrixEntry(bla, int(self.n_data), 0)
self.n_data += fop.jacobian().rows()
self.setJacobian(self._jacobian)
log.debug('updateData deactivated for now (fix is in place)')
# if self.mrt is not None:
# self.mrt.updateData()
if __name__ == "__main__":
pass
# The End