"""
Part of comet/snmr/modelling
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
import pygimli as pg
from comet.snmr.survey import Survey
from comet.snmr.kernel import Kernel
from comet import pyhed as ph
from . nmr_base import SNMRBase
from . snmrModelling import SNMRJointModelling
import matplotlib.pyplot as plt
from pathlib import Path
[docs]class MRT:
def __init__(self, survey=None, dim=2, dtype='complex', mtype='smooth'):
""" #TODO! """
self.fop = None
self.inv = None
self.sounds = []
self.dim = dim
self._dtype = None
self.setDataType(dtype)
self._mtype = None
self.setModelType(mtype)
if survey is None:
self.survey = Survey()
else:
self.survey = survey
self.initSoundings()
# for scci, read only
self._numPara = 2
def _gather(self, attribute):
""" Internal function.
Gather variables from underlaying soundings. """
ret = []
for sound in self.sounds:
ret.append(getattr(sound, attribute))
return ret
@property
def data(self):
''' Concatenated data vectors of sounds.'''
return np.concatenate(self._gather('data'))
@property
def data_slices(self):
''' Slices to get single data from self.data.
Data[sound #2] = mrt.data[mrt.data_slices[1]]
'''
slices = []
indices = self.dataIndices
for i in range(len(self.sounds)):
slices.append(slice(indices[i], indices[i + 1]))
return slices
@property
def error(self):
''' Concatenated error vectors of sounds.'''
return np.concatenate(self._gather('error'))
@property
def kernels(self):
''' List with underlaying kernels from sounds.'''
return self._gather('kernel')
@property
def dtype(self):
return self._dtype
@property
def mtype(self):
return self._mtype
@property
def dataIndices(self):
ind = [0]
for sound in self.sounds:
ind.append(len(sound.data))
return np.cumsum(ind)
[docs] def updateData(self):
""" Update data vector in inversion instance. """
ph.log.info('mrt: update Data')
ph.log.log(13, self.inv)
if ph.log.getEffectiveLevel() <= 13:
np.save('mrtdata_it_{}.npy'.format(self.inv.iter()),
self.data)
np.save('dataphases_it_{}.npy'.format(self.inv.iter()),
self.survey._gather('data_phase'))
self.inv.setData(self.data)
if ph.log.getEffectiveLevel() <= 13:
np.save('invdata_it_{}.npy'.format(self.inv.iter()),
self.inv.data())
[docs] def setDataType(self, dtype):
dtypes = ['complex', 'rotatedAmplitudes', 'amplitudes']
assert dtype in dtypes, f'dtype has to be in {dtypes}'
self._dtype = dtype
if self.fop is not None:
self.fop.dtype = self.dtype
for sound in self.sounds:
sound.setDataType(self.dtype)
[docs] def setModelType(self, mtype):
mtypes = ['block', 'smooth']
assert mtype in mtypes, f'mtype has to be in {mtypes}'
if mtype == 'block' and self.dim == 2:
raise Exception('Error. No block model for 2D inversion possible.')
self._mtype = mtype
[docs] def setZWeight(self, z_weight):
if self.fop is None:
raise Exception('No fop initialized to set zweight to.')
else:
self.fop.regionManager().setZWeight(z_weight)
# self.fop.regionManager().region(1).setZWeight(z_weight)
# self.fop.regionManager().region(0)._createConstraintWeights()
# self.fop.regionManager().region(1)._createConstraintWeights()
# self.fop.regionManager().region(1).setConstraintWeights(
# self.fop.regionManager().region(0).constraintWeights())
# problem: z_weight of region 2 (t2*) is set, hoiwever constraint
# weights are wrong. Set manually.
# self.fop.regionManager().createConstraintsWeight()
# self.fop.regionManager().region(1).setConstraintsWeight(
# self.fop.regionManager().region(0).constraintsWeight())
# testing weights
r0 = self.fop.regionManager().region(0)
r1 = self.fop.regionManager().region(1)
r0w = r0.constraintWeights().array()
r1w = r1.constraintWeights().array()
try:
assert np.allclose(np.sort(r0w), np.sort(r1w))
assert np.isclose(r0.zWeight(), z_weight)
assert np.isclose(r1.zWeight(), z_weight)
except AssertionError as err:
ph.log.error('checking region weights: {}'
.format(np.allclose(r0w, r1w)))
ph.log.error(r0w)
ph.log.error(r1w)
ph.log.error('checking zweight region 0: {}'
.format(np.isclose(r0.zWeight(), z_weight)))
ph.log.error('checking zweight region 1: {}'
.format(np.isclose(r1.zWeight(), z_weight)))
raise err
def _initSounding(self, sound_index):
""" """
snmr = SNMRBase(survey=self.survey,
fid=sound_index,
dim=self.dim,
dtype=self.dtype,
mtype=self.mtype)
ph.log.log(13, 'init sounding {}: {}'.format(sound_index, snmr))
ph.log.debug('used fid: {}'.format(self.survey.fids[sound_index]))
self.sounds[sound_index] = snmr
[docs] def initSoundings(self, override=False):
""" Extends the sounding list for the fids in survey.
Called automatically is necessary.
"""
fill = len(self.survey.fids) - len(self.sounds)
if fill > 0:
self.sounds.extend([None] * fill)
for ind in range(len(self.sounds)):
if self.sounds[ind] is None or override:
self._initSounding(ind)
[docs] def setSurvey(self, survey):
"""
Defines the survey that holds the various soundings and datasets.
"""
self.survey = survey
self.sounds = []
self.initSoundings()
[docs] def setKernels(self, basename, load_loopmesh=False,
use_order_refinement=True, indices=None):
""" Sets the kernels for the underlaying soundings. Basename will be
formatted with index. Example 5 soundings, basename = 'kern_{}' will
result in import of **kernel_0**, **kernel_1**, ..., **kernel_4**.
"""
for si, sound in enumerate(self.sounds):
if indices is not None:
idx = indices[si]
else:
idx = si
# load_kernelmesh = True as the actual import that could be cached
# is not the bottleneck, its the create h2 function!
sound.setKernel(basename.format(idx),
load_loopmesh=load_loopmesh,
load_kernelmesh=True,
use_order_refinement=use_order_refinement)
[docs] def createFOP(self, kernelmesh=None,
secondary=False, para_mesh_2d=None, **kwargs):
""" kwargs: order (h refinement order for kernel mesh)"""
ph.log.info('MRT: create FOP()')
order = kwargs.pop('order', 1)
# add missing soundings
self.initSoundings()
# now give self in order to trigger an update of the Data, whenever
# needed after createJacobian (phase shift) in complex inversion.
self.fop = SNMRJointModelling(mrt=self)
# create FOPs
for sound in self.sounds:
sound.createFOP(nlay=kwargs.pop('nlay', 3))
self.fop.addFOP(sound.fop)
if kernelmesh is not None:
ph.log.info('MRT: setting kernel meshes')
self.setKernelMesh(kernelmesh, order=order)
if self.fop.mesh() is None:
if self.sounds[0].kernel.kernelMesh2D is not None:
self.createFOPMesh()
if self.dim == 1:
self.createFOPMesh()
if self.dim == 2:
ph.log.debug('MRT: fop.mesh(): {!s}'.format(self.fop.mesh()))
[docs] def create1DKernelMesh(self, verbose=False):
self.initSoundings()
used_loops = self.survey.loops[self.survey.used_loops]
merged = ph.loop.mergeLoops(used_loops, true_merge=False)
hlp = Survey()
hlp.addLoop(merged)
hlp.createSounding(tx=0, rx=0)
kern = Kernel(survey=hlp, fid=0)
kern.create1DKernelMesh(verbose=verbose)
for sound in self.sounds:
sound.kernel.setZVector(kern.getZVector(reduced=False),
min_thk=kern.min_thk)
sound.kernel.set1DKernelMesh(kern.kernelMesh1D)
[docs] def setKernelMesh(self, mesh, order=1, **kwargs):
""" """
# add missing soundings
self.initSoundings()
ph.log.info('MRT: set kernel mesh')
if self.dim == 2:
# water content
for sound_i, sounding in enumerate(self.sounds):
if sound_i == 0:
if isinstance(mesh, str):
if mesh not in self.survey.loaded_meshes.keys():
# case 1/2: got string, is not in survey, load
self.survey.loaded_meshes[mesh] = pg.Mesh(mesh)
else:
# case 2/2: got mesh, transfer mesh to survey
self.survey.loaded_meshes['kernelmesh'] = pg.Mesh(mesh)
mesh = 'kernelmesh'
# load first kernelmesh and create interpolation matrices
sounding.kernel.set2DKernelMesh(
self.survey.loaded_meshes[mesh],
order=order,
**kwargs)
else:
# give mesh ref and interpolation matrices to other kernels
sounding.kernel.set2DKernelMesh(
self.survey.loaded_meshes[mesh],
order=order,
integration_mat=self.sounds[0].kernel.h_mat,
yvec=self.sounds[0].kernel.yvec,
**kwargs)
self.createFOPMesh()
else:
# in 1D not needed
pass
[docs] def createFOPMesh(self):
ph.log.info('MRT: creating fop mesh')
if self.dim == 2:
# water content
mesh = pg.Mesh(self.sounds[0].kernel.kernelMesh2D)
mesh.setCellMarkers(np.zeros(mesh.cellCount(), dtype=int))
# relaxation times
mesh2 = pg.Mesh(mesh)
mesh2.setCellMarkers(np.ones(mesh2.cellCount(), dtype=int))
together = pg.meshtools.merge2Meshes(
mesh,
mesh2.translate(pg.Vector([0., 0., 2.])))
# together.swapCoordinates(1, 2)
self.fop.setMesh(together)
self.setModelTrans()
elif self.dim == 1:
self.fop.setMesh(self.fop.fop_list[0].mesh())
[docs] def simulate(self, model, error, samplingrate=1000., max_time=1.0,
num_gates=50, verbose=False, **kwargs):
""" """
# add missing soundings
self.initSoundings()
if isinstance(error, (float, int)):
error = np.ones(len(self.sounds)) * error
for ind, sound in enumerate(self.sounds):
ph.log.info('simulate sounding {}: {}, err: {:.1e} V'
.format(ind, sound, error[ind]))
kwargs['raw_debug_praefix'] = 'sound{}_'.format(ind)
sound.simulate(model, err=error[ind], verbose=verbose,
samplingrate=samplingrate, max_time=max_time,
num_gates=num_gates, **kwargs)
[docs] def setDataAndErrorVector(self, data, error=None, phi=None, df=None):
""" Depricated!
Set Data and Error in MRT and the underlaying MRS instances.
"""
raise DeprecationWarning(
'Data and errors are solely handled by the survey class, please '
'set data and errors there, the mrt class finds them.')
[docs] def setDataAndErrorCube(self, data, error, phase, df=None):
""" Depricated!
Set data and error cubes using the methods of the single soundings.
Input has to be a list or iterable object of data, and error cubes
(pulses x gates) a corresponding list of phase vectors for each pulse
and a float defining the frequency offset per sounding.
"""
raise DeprecationWarning(
'Data and errors are solely handled by the survey class, please '
'set data and errors there, the mrt class finds them.')
[docs] def getSingleDataAndError(self, sounding_idx):
data = self.sounds[sounding_idx].data
error = self.sounds[sounding_idx].error
return data, error
[docs] def createINV(self, lam=1000, verbose=True, debug=False, **kwargs):
"""
Create inversion instance (and fop if necessary with nlay).
Parameter
---------
lam: float [100]
Lambda factor for inversion.
verbose: bool [True]
Additional verbose decission, can be True, even if the rest of the
Manager should remain silent. Most information of the different
iterations is printed in the console. It's recommended to set
verbose in this case to True (default).
special kwargs for Marquardt scheme (block)
-------------------------------------------
lambdaFactor, float [0.8]
Sets lambda factor for Marquardt scheme.
robust, bool [False]
Sets the robust flag for the data.
See pg.RInversion for more details
special kwargs for smooth scheme
---------------------------------
logTrans, bool [True]
Applies a logarithmic transformation to the data. Its recommended
to do so (default), due to the dealing with water contents,
which can't be negative. Logarithmic transformation is the easiest
way to archieve that.
blockyModel, bool [False]
Instead of the standard L2-Norm a L1 Norm can be used to allow for
more blocky models. Heavy changes in watercontent and relaxation
times can sometimes be fitted better this way.
"""
try:
if None in self.survey.data:
raise Exception(
'Found empty data entry in Survey. Abort createINV.')
except ValueError:
# in this case all entries are arrays, and this test is successfull
ph.log.debug('createINV: data: {}'.format(self.data))
# proceed
if self.fop is None:
self.createFOP(**kwargs)
self.inv = pg.core.RInversion(pg.Vector(self.data), self.fop,
verbose=verbose, dosave=debug)
# ph.log.warning('using new inversion class...')
# self.inv = pg.Inversion(self.fop, verbose=verbose, debug=debug)
if kwargs.pop('logTrans', True) is True:
# data transformation
self.transmodel = pg.core.TransLog() # has to be "self"
self.inv.setTransModel(self.transmodel)
modeltrans = {}
if 'thk' in kwargs:
modeltrans['thk'] = kwargs.pop('thk')
if 'wc' in kwargs:
modeltrans['wc'] = kwargs.pop('wc')
if 't2' in kwargs:
modeltrans['t2'] = kwargs.pop('t2')
self.setModelTrans(**modeltrans)
if kwargs.pop('blockyModel', False) is True:
self.inv.setBlockyModel(True) # L1 Norm
self.inv.setLambda(lam)
self.inv.stopAtChi1(kwargs.pop('stopAtChi1', False))
self.inv.setDeltaPhiAbortPercent(
kwargs.pop('deltaPhiAbortPercent', 1))
ph.log.log(13, 'inv: setAbsoluteError() min/max: {} / {}'
.format(np.min(self.error), np.max(self.error)))
self.inv.setAbsoluteError(kwargs.pop('absoluteError', self.error))
self.inv.setMaxIter(kwargs.pop('maxIter', 15))
return self.inv
[docs] def setModelTrans(self,
thk=(10, 1, 30, 'log'),
wc=(0.3, 0.0, 0.7, 'cot'),
t2=(0.2, 0.005, 1.0, 'log')):
"""
Sets model transformation for water content, relaxation times, and
thickness (1D). input = (startvalue, min, max, type).
Known types are cotangens ('cot') and logarithmic ('log')
transformations.
"""
prints = ('MRT: trans thk (start, min, max, type): '
'{:.3f}, {:.3f}, {:.3f}, {}'.format(*thk),
'MRT: trans wc (start, min, max, type): '
'{:.3f}, {:.3f}, {:.3f}, {}'.format(*wc),
'MRT: trans t2 (start, min, max, type): '
'{:.3f}, {:.3f}, {:.3f}, {}'.format(*t2))
if self.fop:
if self.dim == 1 and self.dtype == 'block':
ph.log.log(13, prints[0])
ph.log.log(13, prints[1])
ph.log.log(13, prints[2])
self.fop.region(0).setParameters(*thk)
self.fop.region(0).setParameters(*wc)
self.fop.region(1).setParameters(*t2)
else:
ph.log.log(13, prints[1])
ph.log.log(13, prints[2])
self.fop.region(0).setParameters(*wc)
self.fop.region(1).setParameters(*t2)
else:
pg.warn('No FOP to set model transformation to.')
[docs] def showSounding(self, index, ax=None, to_plot='abs', draw='data',
figsize=(5, 3), **kwargs):
"""
Shows Data, Error or misfit of a site.
"""
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
if ph.plot.quantile(self.data, perc=0.01) >= 0 or to_plot == 'abs':
# no negative values or show abs anyway
cmap = 'viridis'
else:
# found negative values or show error/misfit etc.
cmap = 'bwr'
ph.plot.drawFid(ax, self.sounds[index].fid, draw=draw,
cmap=cmap, to_plot=to_plot, **kwargs)
return ax
[docs] def showFids(self, to_plot='abs', rows_cols=None, ax=None, draw='data',
**kwargs):
""" kwargs to ph.plot.drawFID(**kwargs) """
used_tx = np.unique(self.survey.tx_indices)
used_rx = np.unique(self.survey.rx_indices)
if rows_cols is None:
n_rows = len(used_tx)
n_cols = len(used_rx)
flatten = False
else:
n_rows, n_cols = rows_cols
flatten = True
if (n_rows * n_cols) < len(self.sounds):
raise Exception(
'Total number of sounds does not fit in '
'given rows, cols: {} != {} * {}'.format(
len(self.sounds), n_rows, n_cols))
ncr = np.array((n_cols/n_rows, 1.))
ncr *= 16./np.max(ncr)
if ax is None:
fig, ax = plt.subplots(n_rows, n_cols,
figsize=(int(ncr[0]) + 1, int(ncr[1]) + 1))
for index in range(len(self.sounds)):
tx = self.survey.fids[index].tx_index
rx = self.survey.fids[index].rx_index
ax_tx = np.where(used_tx == tx)[0][0]
ax_rx = np.where(used_rx == rx)[0][0]
if n_rows == 1 and n_cols != 1:
cur_ax = ax[ax_rx]
elif n_cols != 1 and n_cols == 1:
cur_ax = ax[ax_tx]
elif n_cols == 1 and n_cols == 1:
cur_ax = ax
elif flatten is False:
cur_ax = ax[ax_tx, ax_rx]
else:
cur_ax = ax.flatten()[index]
self.showSounding(index, ax=cur_ax, to_plot=to_plot,
draw=draw, **kwargs)
return ax
[docs] def saveResults(self, basename):
"""Saves orig data, model, error and forward model as well as chi2."""
path = Path(basename)
path.parent.mkdir(parents=True, exist_ok=True)
ph.log.info('MRT: saving results: {}'
.format(path.with_suffix('.npz').absolute().as_posix()))
npz = {}
npz['inversion_response'] =\
np.array(self.fop.forward(self.inv.model()))
npz['inversion_tofit'] = self.data
npz['inversion_model'] = self.inv.model().array()
npz['inversion_error'] = self.inv.error().array()
npz['inversion_chi2'] = np.atleast_1d(self.inv.getChi2())
np.savez(path.with_suffix('.npz').as_posix(), **npz)
[docs] def loadResults(self, basename, gates=True, pulses=True):
""" returns (model, error, response, chi2) """
path = Path(basename)
ph.log.info('MRT: loading results: {}'
.format(path.with_suffix('.npz').absolute().as_posix()))
with np.load(path.with_suffix('.npz').as_posix()) as npz:
model = ph.IO.getItem(npz, 'inversion_model')
error = ph.IO.getItem(npz, 'inversion_error')
response = ph.IO.getItem(npz, 'inversion_response')
chi2 = ph.IO.getItem(npz, 'inversion_chi2')
if self.inv is not None:
self.inv.setModel(model)
self.inv.setAbsoluteError(error)
return (model, error, response, chi2)
if __name__ == '__main__':
pass
# The End