Source code for comet.snmr.modelling.mrs_survey

"""
Part of comet/snmr/modelling
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import numpy as np
import pygimli as pg

from comet.snmr.survey import Survey
from comet.snmr.kernel import Kernel
from comet import pyhed as ph

from . nmr_base import SNMRBase
from . snmrModelling import SNMRJointModelling

import matplotlib.pyplot as plt
from pathlib import Path


[docs]class MRT: def __init__(self, survey=None, dim=2, dtype='complex', mtype='smooth'): """ #TODO! """ self.fop = None self.inv = None self.sounds = [] self.dim = dim self._dtype = None self.setDataType(dtype) self._mtype = None self.setModelType(mtype) if survey is None: self.survey = Survey() else: self.survey = survey self.initSoundings() # for scci, read only self._numPara = 2 def _gather(self, attribute): """ Internal function. Gather variables from underlaying soundings. """ ret = [] for sound in self.sounds: ret.append(getattr(sound, attribute)) return ret @property def data(self): ''' Concatenated data vectors of sounds.''' return np.concatenate(self._gather('data')) @property def data_slices(self): ''' Slices to get single data from self.data. Data[sound #2] = mrt.data[mrt.data_slices[1]] ''' slices = [] indices = self.dataIndices for i in range(len(self.sounds)): slices.append(slice(indices[i], indices[i + 1])) return slices @property def error(self): ''' Concatenated error vectors of sounds.''' return np.concatenate(self._gather('error')) @property def kernels(self): ''' List with underlaying kernels from sounds.''' return self._gather('kernel') @property def dtype(self): return self._dtype @property def mtype(self): return self._mtype @property def dataIndices(self): ind = [0] for sound in self.sounds: ind.append(len(sound.data)) return np.cumsum(ind)
[docs] def updateData(self): """ Update data vector in inversion instance. """ ph.log.info('mrt: update Data') ph.log.log(13, self.inv) if ph.log.getEffectiveLevel() <= 13: np.save('mrtdata_it_{}.npy'.format(self.inv.iter()), self.data) np.save('dataphases_it_{}.npy'.format(self.inv.iter()), self.survey._gather('data_phase')) self.inv.setData(self.data) if ph.log.getEffectiveLevel() <= 13: np.save('invdata_it_{}.npy'.format(self.inv.iter()), self.inv.data())
[docs] def setDataType(self, dtype): dtypes = ['complex', 'rotatedAmplitudes', 'amplitudes'] assert dtype in dtypes, f'dtype has to be in {dtypes}' self._dtype = dtype if self.fop is not None: self.fop.dtype = self.dtype for sound in self.sounds: sound.setDataType(self.dtype)
[docs] def setModelType(self, mtype): mtypes = ['block', 'smooth'] assert mtype in mtypes, f'mtype has to be in {mtypes}' if mtype == 'block' and self.dim == 2: raise Exception('Error. No block model for 2D inversion possible.') self._mtype = mtype
[docs] def setZWeight(self, z_weight): if self.fop is None: raise Exception('No fop initialized to set zweight to.') else: self.fop.regionManager().setZWeight(z_weight) # self.fop.regionManager().region(1).setZWeight(z_weight) # self.fop.regionManager().region(0)._createConstraintWeights() # self.fop.regionManager().region(1)._createConstraintWeights() # self.fop.regionManager().region(1).setConstraintWeights( # self.fop.regionManager().region(0).constraintWeights()) # problem: z_weight of region 2 (t2*) is set, hoiwever constraint # weights are wrong. Set manually. # self.fop.regionManager().createConstraintsWeight() # self.fop.regionManager().region(1).setConstraintsWeight( # self.fop.regionManager().region(0).constraintsWeight()) # testing weights r0 = self.fop.regionManager().region(0) r1 = self.fop.regionManager().region(1) r0w = r0.constraintWeights().array() r1w = r1.constraintWeights().array() try: assert np.allclose(np.sort(r0w), np.sort(r1w)) assert np.isclose(r0.zWeight(), z_weight) assert np.isclose(r1.zWeight(), z_weight) except AssertionError as err: ph.log.error('checking region weights: {}' .format(np.allclose(r0w, r1w))) ph.log.error(r0w) ph.log.error(r1w) ph.log.error('checking zweight region 0: {}' .format(np.isclose(r0.zWeight(), z_weight))) ph.log.error('checking zweight region 1: {}' .format(np.isclose(r1.zWeight(), z_weight))) raise err
def _initSounding(self, sound_index): """ """ snmr = SNMRBase(survey=self.survey, fid=sound_index, dim=self.dim, dtype=self.dtype, mtype=self.mtype) ph.log.log(13, 'init sounding {}: {}'.format(sound_index, snmr)) ph.log.debug('used fid: {}'.format(self.survey.fids[sound_index])) self.sounds[sound_index] = snmr
[docs] def initSoundings(self, override=False): """ Extends the sounding list for the fids in survey. Called automatically is necessary. """ fill = len(self.survey.fids) - len(self.sounds) if fill > 0: self.sounds.extend([None] * fill) for ind in range(len(self.sounds)): if self.sounds[ind] is None or override: self._initSounding(ind)
[docs] def setSurvey(self, survey): """ Defines the survey that holds the various soundings and datasets. """ self.survey = survey self.sounds = [] self.initSoundings()
[docs] def setKernels(self, basename, load_loopmesh=False, use_order_refinement=True, indices=None): """ Sets the kernels for the underlaying soundings. Basename will be formatted with index. Example 5 soundings, basename = 'kern_{}' will result in import of **kernel_0**, **kernel_1**, ..., **kernel_4**. """ for si, sound in enumerate(self.sounds): if indices is not None: idx = indices[si] else: idx = si # load_kernelmesh = True as the actual import that could be cached # is not the bottleneck, its the create h2 function! sound.setKernel(basename.format(idx), load_loopmesh=load_loopmesh, load_kernelmesh=True, use_order_refinement=use_order_refinement)
[docs] def createFOP(self, kernelmesh=None, secondary=False, para_mesh_2d=None, **kwargs): """ kwargs: order (h refinement order for kernel mesh)""" ph.log.info('MRT: create FOP()') order = kwargs.pop('order', 1) # add missing soundings self.initSoundings() # now give self in order to trigger an update of the Data, whenever # needed after createJacobian (phase shift) in complex inversion. self.fop = SNMRJointModelling(mrt=self) # create FOPs for sound in self.sounds: sound.createFOP(nlay=kwargs.pop('nlay', 3)) self.fop.addFOP(sound.fop) if kernelmesh is not None: ph.log.info('MRT: setting kernel meshes') self.setKernelMesh(kernelmesh, order=order) if self.fop.mesh() is None: if self.sounds[0].kernel.kernelMesh2D is not None: self.createFOPMesh() if self.dim == 1: self.createFOPMesh() if self.dim == 2: ph.log.debug('MRT: fop.mesh(): {!s}'.format(self.fop.mesh()))
[docs] def create1DKernelMesh(self, verbose=False): self.initSoundings() used_loops = self.survey.loops[self.survey.used_loops] merged = ph.loop.mergeLoops(used_loops, true_merge=False) hlp = Survey() hlp.addLoop(merged) hlp.createSounding(tx=0, rx=0) kern = Kernel(survey=hlp, fid=0) kern.create1DKernelMesh(verbose=verbose) for sound in self.sounds: sound.kernel.setZVector(kern.getZVector(reduced=False), min_thk=kern.min_thk) sound.kernel.set1DKernelMesh(kern.kernelMesh1D)
[docs] def setKernelMesh(self, mesh, order=1, **kwargs): """ """ # add missing soundings self.initSoundings() ph.log.info('MRT: set kernel mesh') if self.dim == 2: # water content for sound_i, sounding in enumerate(self.sounds): if sound_i == 0: if isinstance(mesh, str): if mesh not in self.survey.loaded_meshes.keys(): # case 1/2: got string, is not in survey, load self.survey.loaded_meshes[mesh] = pg.Mesh(mesh) else: # case 2/2: got mesh, transfer mesh to survey self.survey.loaded_meshes['kernelmesh'] = pg.Mesh(mesh) mesh = 'kernelmesh' # load first kernelmesh and create interpolation matrices sounding.kernel.set2DKernelMesh( self.survey.loaded_meshes[mesh], order=order, **kwargs) else: # give mesh ref and interpolation matrices to other kernels sounding.kernel.set2DKernelMesh( self.survey.loaded_meshes[mesh], order=order, integration_mat=self.sounds[0].kernel.h_mat, yvec=self.sounds[0].kernel.yvec, **kwargs) self.createFOPMesh() else: # in 1D not needed pass
[docs] def createFOPMesh(self): ph.log.info('MRT: creating fop mesh') if self.dim == 2: # water content mesh = pg.Mesh(self.sounds[0].kernel.kernelMesh2D) mesh.setCellMarkers(np.zeros(mesh.cellCount(), dtype=int)) # relaxation times mesh2 = pg.Mesh(mesh) mesh2.setCellMarkers(np.ones(mesh2.cellCount(), dtype=int)) together = pg.meshtools.merge2Meshes( mesh, mesh2.translate(pg.Vector([0., 0., 2.]))) # together.swapCoordinates(1, 2) self.fop.setMesh(together) self.setModelTrans() elif self.dim == 1: self.fop.setMesh(self.fop.fop_list[0].mesh())
[docs] def simulate(self, model, error, samplingrate=1000., max_time=1.0, num_gates=50, verbose=False, **kwargs): """ """ # add missing soundings self.initSoundings() if isinstance(error, (float, int)): error = np.ones(len(self.sounds)) * error for ind, sound in enumerate(self.sounds): ph.log.info('simulate sounding {}: {}, err: {:.1e} V' .format(ind, sound, error[ind])) kwargs['raw_debug_praefix'] = 'sound{}_'.format(ind) sound.simulate(model, err=error[ind], verbose=verbose, samplingrate=samplingrate, max_time=max_time, num_gates=num_gates, **kwargs)
[docs] def setDataAndErrorVector(self, data, error=None, phi=None, df=None): """ Depricated! Set Data and Error in MRT and the underlaying MRS instances. """ raise DeprecationWarning( 'Data and errors are solely handled by the survey class, please ' 'set data and errors there, the mrt class finds them.')
[docs] def setDataAndErrorCube(self, data, error, phase, df=None): """ Depricated! Set data and error cubes using the methods of the single soundings. Input has to be a list or iterable object of data, and error cubes (pulses x gates) a corresponding list of phase vectors for each pulse and a float defining the frequency offset per sounding. """ raise DeprecationWarning( 'Data and errors are solely handled by the survey class, please ' 'set data and errors there, the mrt class finds them.')
[docs] def getSingleDataAndError(self, sounding_idx): data = self.sounds[sounding_idx].data error = self.sounds[sounding_idx].error return data, error
[docs] def createINV(self, lam=1000, verbose=True, debug=False, **kwargs): """ Create inversion instance (and fop if necessary with nlay). Parameter --------- lam: float [100] Lambda factor for inversion. verbose: bool [True] Additional verbose decission, can be True, even if the rest of the Manager should remain silent. Most information of the different iterations is printed in the console. It's recommended to set verbose in this case to True (default). special kwargs for Marquardt scheme (block) ------------------------------------------- lambdaFactor, float [0.8] Sets lambda factor for Marquardt scheme. robust, bool [False] Sets the robust flag for the data. See pg.RInversion for more details special kwargs for smooth scheme --------------------------------- logTrans, bool [True] Applies a logarithmic transformation to the data. Its recommended to do so (default), due to the dealing with water contents, which can't be negative. Logarithmic transformation is the easiest way to archieve that. blockyModel, bool [False] Instead of the standard L2-Norm a L1 Norm can be used to allow for more blocky models. Heavy changes in watercontent and relaxation times can sometimes be fitted better this way. """ try: if None in self.survey.data: raise Exception( 'Found empty data entry in Survey. Abort createINV.') except ValueError: # in this case all entries are arrays, and this test is successfull ph.log.debug('createINV: data: {}'.format(self.data)) # proceed if self.fop is None: self.createFOP(**kwargs) self.inv = pg.core.RInversion(pg.Vector(self.data), self.fop, verbose=verbose, dosave=debug) # ph.log.warning('using new inversion class...') # self.inv = pg.Inversion(self.fop, verbose=verbose, debug=debug) if kwargs.pop('logTrans', True) is True: # data transformation self.transmodel = pg.core.TransLog() # has to be "self" self.inv.setTransModel(self.transmodel) modeltrans = {} if 'thk' in kwargs: modeltrans['thk'] = kwargs.pop('thk') if 'wc' in kwargs: modeltrans['wc'] = kwargs.pop('wc') if 't2' in kwargs: modeltrans['t2'] = kwargs.pop('t2') self.setModelTrans(**modeltrans) if kwargs.pop('blockyModel', False) is True: self.inv.setBlockyModel(True) # L1 Norm self.inv.setLambda(lam) self.inv.stopAtChi1(kwargs.pop('stopAtChi1', False)) self.inv.setDeltaPhiAbortPercent( kwargs.pop('deltaPhiAbortPercent', 1)) ph.log.log(13, 'inv: setAbsoluteError() min/max: {} / {}' .format(np.min(self.error), np.max(self.error))) self.inv.setAbsoluteError(kwargs.pop('absoluteError', self.error)) self.inv.setMaxIter(kwargs.pop('maxIter', 15)) return self.inv
[docs] def setModelTrans(self, thk=(10, 1, 30, 'log'), wc=(0.3, 0.0, 0.7, 'cot'), t2=(0.2, 0.005, 1.0, 'log')): """ Sets model transformation for water content, relaxation times, and thickness (1D). input = (startvalue, min, max, type). Known types are cotangens ('cot') and logarithmic ('log') transformations. """ prints = ('MRT: trans thk (start, min, max, type): ' '{:.3f}, {:.3f}, {:.3f}, {}'.format(*thk), 'MRT: trans wc (start, min, max, type): ' '{:.3f}, {:.3f}, {:.3f}, {}'.format(*wc), 'MRT: trans t2 (start, min, max, type): ' '{:.3f}, {:.3f}, {:.3f}, {}'.format(*t2)) if self.fop: if self.dim == 1 and self.dtype == 'block': ph.log.log(13, prints[0]) ph.log.log(13, prints[1]) ph.log.log(13, prints[2]) self.fop.region(0).setParameters(*thk) self.fop.region(0).setParameters(*wc) self.fop.region(1).setParameters(*t2) else: ph.log.log(13, prints[1]) ph.log.log(13, prints[2]) self.fop.region(0).setParameters(*wc) self.fop.region(1).setParameters(*t2) else: pg.warn('No FOP to set model transformation to.')
[docs] def showSounding(self, index, ax=None, to_plot='abs', draw='data', figsize=(5, 3), **kwargs): """ Shows Data, Error or misfit of a site. """ if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize) if ph.plot.quantile(self.data, perc=0.01) >= 0 or to_plot == 'abs': # no negative values or show abs anyway cmap = 'viridis' else: # found negative values or show error/misfit etc. cmap = 'bwr' ph.plot.drawFid(ax, self.sounds[index].fid, draw=draw, cmap=cmap, to_plot=to_plot, **kwargs) return ax
[docs] def showFids(self, to_plot='abs', rows_cols=None, ax=None, draw='data', **kwargs): """ kwargs to ph.plot.drawFID(**kwargs) """ used_tx = np.unique(self.survey.tx_indices) used_rx = np.unique(self.survey.rx_indices) if rows_cols is None: n_rows = len(used_tx) n_cols = len(used_rx) flatten = False else: n_rows, n_cols = rows_cols flatten = True if (n_rows * n_cols) < len(self.sounds): raise Exception( 'Total number of sounds does not fit in ' 'given rows, cols: {} != {} * {}'.format( len(self.sounds), n_rows, n_cols)) ncr = np.array((n_cols/n_rows, 1.)) ncr *= 16./np.max(ncr) if ax is None: fig, ax = plt.subplots(n_rows, n_cols, figsize=(int(ncr[0]) + 1, int(ncr[1]) + 1)) for index in range(len(self.sounds)): tx = self.survey.fids[index].tx_index rx = self.survey.fids[index].rx_index ax_tx = np.where(used_tx == tx)[0][0] ax_rx = np.where(used_rx == rx)[0][0] if n_rows == 1 and n_cols != 1: cur_ax = ax[ax_rx] elif n_cols != 1 and n_cols == 1: cur_ax = ax[ax_tx] elif n_cols == 1 and n_cols == 1: cur_ax = ax elif flatten is False: cur_ax = ax[ax_tx, ax_rx] else: cur_ax = ax.flatten()[index] self.showSounding(index, ax=cur_ax, to_plot=to_plot, draw=draw, **kwargs) return ax
[docs] def saveResults(self, basename): """Saves orig data, model, error and forward model as well as chi2.""" path = Path(basename) path.parent.mkdir(parents=True, exist_ok=True) ph.log.info('MRT: saving results: {}' .format(path.with_suffix('.npz').absolute().as_posix())) npz = {} npz['inversion_response'] =\ np.array(self.fop.forward(self.inv.model())) npz['inversion_tofit'] = self.data npz['inversion_model'] = self.inv.model().array() npz['inversion_error'] = self.inv.error().array() npz['inversion_chi2'] = np.atleast_1d(self.inv.getChi2()) np.savez(path.with_suffix('.npz').as_posix(), **npz)
[docs] def loadResults(self, basename, gates=True, pulses=True): """ returns (model, error, response, chi2) """ path = Path(basename) ph.log.info('MRT: loading results: {}' .format(path.with_suffix('.npz').absolute().as_posix())) with np.load(path.with_suffix('.npz').as_posix()) as npz: model = ph.IO.getItem(npz, 'inversion_model') error = ph.IO.getItem(npz, 'inversion_error') response = ph.IO.getItem(npz, 'inversion_response') chi2 = ph.IO.getItem(npz, 'inversion_chi2') if self.inv is not None: self.inv.setModel(model) self.inv.setAbsoluteError(error) return (model, error, response, chi2)
if __name__ == '__main__': pass # The End