Source code for comet.snmr.modelling.mrs

"""
Part of comet/snmr/modelling

Magnetic resonance sounding module.
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

# general modules to import according to standards
import time
import numpy as np

import matplotlib.pyplot as plt

import pygimli as pg
from pygimli.utils import iterateBounds

try:  # pg.__version__ >= 1.1
    from pygimli.viewer.mpl import drawModel1D
except ImportError:  # pg.__version__ < 1.1
    from pygimli.mplviewer import drawModel1D

from comet.snmr import kernel as k

# local functions in package
# from . snmrModelling import MRS1dBlockQTModelling, SNMRModelling
from . nmr_base import SNMRBase


[docs]class MRS(SNMRBase): """ Magnetic resonance sounding (MRS) manager class. """ def __init__(self, survey=None, fid=0, kernel=None, mtype='smooth', dtype='rotatedAmplitudes', **kwargs): """ Parameters ---------- kernel: pg.Matrix or ndarray or class-instance or string Basically a simple precalculated kernel matrix in the .mrsi, .mrsd or .npz format is sufficient for this class to work. If a recalcualtion of the kernelmatrix is intended, an instance of a kernelclass forward operator can be loaded. gates: np.array or pg.Vector Gates are the time discretisation of the NMR measurement, they result in the preprocessing of the measured data, where the gating is used to improve the signal-to-noise ratio. Data and Error Cubes can be loaded seperately using the method 'loadDataAndError', or a new set of synthetic data can be calcualted. """ super(MRS, self).__init__( survey=survey, fid=fid, kernel=kernel, mtype=mtype, dtype=dtype, dim=1, **kwargs)
[docs] def showDataAndError(self, ax=None, figsize=(10, 8), as_log=False): """Show data cube along with error cube.""" if ax is None: fig, ax = plt.subplots(1, 2, figsize=figsize) self.showCube(ax[0], self.data * 1e9, islog=as_log) self.showCube(ax[1], self.error * 1e9, islog=as_log) return ax
[docs] def showKernel(self, ax=None, save=None, **kwargs): """ Show the kernel as matrix (Q over z). If Kernel is a class object, the plotting order is redirected to Kernel.show(**kwargs) To see more about the plotting options type this in the console: >>> import kernel as k >>> help(k.Kernel.show) """ if self.Kernel is not None: self.Kernel.show(ax=ax, **kwargs) if save is not None: plt.savefig(save, bbox_inches='tight') else: if ax is None: fig, ax = plt.subplots() ax.imshow(np.absolute(self.K).T, interpolation='nearest', aspect='auto') yt = ax.get_yticks() maxzi = self.K.shape[1] yt = yt[(yt >= 0) & (yt < maxzi)] if yt[-1] < maxzi-2: yt = np.hstack((yt, maxzi)) ytl = [str(self.kernel.zvec[int(yti)]) for yti in yt] zl = self.kernel.zvec[[int(yti) for yti in yt]] ytl = [str(zi) for zi in np.round(zl, 1)] ax.set_yticks(yt) ax.set_yticklabels(ytl) xt = ax.get_xticks() maxqi = self.K.shape[0] xt = xt[(xt >= 0) & (xt < maxqi)] xtl = [np.round(self.pulses[int(iq)], 2) for iq in xt] ax.set_xticks(xt) ax.set_xticklabels(xtl) return ax
[docs] def searchForLambda(self, startLam=20000): """ Runs several inversion runs to find the highest lambda which is able to fit the data within its errors. """ self.invert(lam=startLam, runChi1=True) Lam = self.inv.getLambda() factor = 1.05 # stepwise search for optimized lam for i in range(15): nextLam = Lam * factor self.invert(lam=nextLam, runChi1=True) Lam = self.inv.getLambda() if Lam > nextLam/factor: continue else: break return nextLam
[docs] def showResult(self, figsize=(10, 8), save='', fig=None, ax=None, syn=None, wclabel=None, t2label=None, color=None): """Show theta(z) and T2*(z) (+uncertainties if there).""" if ax is None: fig, ax = plt.subplots(1, 2, sharey=True, figsize=figsize) if self.inv is not None: thk, wc, t2 = self.splitModel(model=self.inv.model()) else: thk, wc, t2 = self.splitModel() if syn is not None: drawModel1D(ax[0], syn[0], syn[1], color=color, label='synthetic') synT2 = np.array(syn[2])*1000.0 drawModel1D(ax[1], syn[0], synT2, color=color, label='synthetic') showWC(ax[0], thk, wc, color=color, label=wclabel) showT2(ax[1], thk, t2, color=color, label=t2label) if self.modelL is not None and self.modelU is not None: thkL, wcL, t2L = self.splitModel(self.modelL) thkU, wcU, t2U = self.splitModel(self.modelU) showErrorBars(ax[0], thk, wc, thkL, thkU, wcL, wcU) showErrorBars(ax[1], thk, t2*1e3, thkL, thkU, t2L*1e3, t2U*1e3) if fig is not None: if save: fig.savefig(save, bbox_inches='tight') return ax
[docs] def showResultAndFit(self, figsize=(12, 10), save='', maxdep=0., clim=None, suptitle=None, syn=None, wclabel=None, t2label=None): """Show ec(z), T2*(z), data and model response.""" fig, ax = plt.subplots(2, 2, figsize=figsize) if self.inv is not None: thk, wc, t2 = self.splitModel(model=self.inv.model()) else: thk, wc, t2 = self.splitModel() if syn is not None: drawModel1D(ax[0, 0], syn[0], syn[1], color='b', label='synthetic') synT2 = np.array(syn[2]) * 1000.0 drawModel1D(ax[0, 1], syn[0], synT2, color='b', label='synthetic') if wclabel is None: wclabel = r'$\lambda$={:2.2f}'.format(self.inv.getLambda()) if t2label is None: t2label = r'$\lambda$={:2.2f}'.format(self.inv.getLambda()) showWC(ax[0, 0], thk, wc, label=wclabel) showT2(ax[0, 1], thk, t2, label=t2label) ax[0, 0].set_title(r'water content $\theta$') ax[0, 1].set_title(r'decay time $T_2^*$') ax[0, 0].set_ylabel('$z$ [m]') ax[0, 1].set_ylabel('$z$ [m]') ax[0, 0].legend(loc='best') ax[0, 1].legend(loc='best') if self.modelL is not None and self.modelU is not None: thkL, wcL, t2L = self.splitModel(self.modelL) thkU, wcU, t2U = self.splitModel(self.modelU) showErrorBars(ax[0, 0], thk, wc, thkL, thkU, wcL, wcU) showErrorBars(ax[0, 1], thk, t2*1e3, thkL, thkU, t2L*1e3, t2U*1e3) lim = [np.max([np.max(maxdep), np.abs(self.kernel.zvec[-1])]), 0.] ax[0, 0].set_ylim(lim) ax[0, 1].set_ylim(lim) if self.inv is not None: resp = self.inv.response() if len(resp) == 0: # True if for example just imported from .mrsi resp = self.data - self.error print('Attention: plotted simulated data are calculated as \ difference between data and error due to simple import of inversion results!') ewmisfit = (self.data - resp) / self.error len_real = len(self.fid.gates) * len(self.fid.pulses) iscomplex = len(ewmisfit) == 2 * len_real if iscomplex: self.showCube(ax[1, 0], ewmisfit[:len_real], islog=False, cmap='bwr', clim=[-4, 4]) self.showCube(ax[1, 1], ewmisfit[len_real:], islog=False, cmap='bwr', clim=[-4, 4]) else: subax = plt.subplot(2, 1, 2) self.showCube(subax, ewmisfit, islog=False, cmap='bwr', clim=[-4, 4]) title = 'error-weighted misfit (amp)' if self.inv is not None: chi2 = self.inv.getChi2() title += ' chi² = {:.3f}'.format(chi2) subax.set_title(title) if suptitle: fig.suptitle(suptitle) if save: fig.savefig(save, bbox_inches='tight') return ax
[docs] def showDataAndFit(self, compare_to=None, figsize=(8, 6), savename=None, clim=None, suptitle=None, separated=False, savematrices=False): """data and error weighted misfit. 1,1 or 2,2 for complex""" Complex = self.dtype == 'complex' if self.inv is None: raise Exception('Cannot show data and fit without inversion ' 'instance.') if separated is False: fig, ax = plt.subplots(1 + Complex, 2, figsize=figsize, sharey=True, sharex=True) else: fig1, ax1 = plt.subplots(1, 1, figsize=figsize) fig2, ax2 = plt.subplots(1, 1, figsize=figsize) fig3, ax3 = plt.subplots(1, 1, figsize=figsize) fig4, ax4 = plt.subplots(1, 1, figsize=figsize) ax = np.array([[ax1, ax2], [ax3, ax4]]) fig = [fig1, fig2, fig3, fig4] if compare_to is None: response = self.inv.response() else: response = compare_to if Complex is True: datareal, dataimag = np.split(self.data, 2) self.showCube(ax[0, 0], datareal * 1e9, islog=False, clim=clim, cmap='bwr') ax[0, 0].set_title('measured data real [nV]') # log10 # ax[0, 0].set_xlabel('') self.showCube(ax[0, 1], dataimag * 1e9, islog=False, clim=clim, cmap='bwr') ax[0, 1].set_title('measured data imag [nV]') # log10 # ax[0, 1].set_ylabel('') # ax[0, 1].set_xlabel('') ewmisfit = (self.data - response) / self.error errorreal, errorimag = np.split(ewmisfit, 2) self.showCube(ax[1, 0], errorreal, islog=False, cmap='bwr', clim=[-4, 4]) ax[1, 0].set_title('error-weighted misfit real') self.showCube(ax[1, 1], errorimag, islog=False, cmap='bwr', clim=[-4, 4]) ax[1, 1].set_title('error-weighted misfit imag') # ax[1, 1].set_ylabel('') # if suptitle: # fig.suptitle(suptitle, size=36) else: self.showCube(ax[0], self.data * 1e9, islog=False, clim=clim) ax[0].set_title('measured data [nV]') # log10 ewmisfit = (self.data - response) / self.error if savename is not None: np.save('{}/mrs_data.npy' .format(savename.split(savename.split('/')[-1])[0]), self.data) np.save('{}/mrs_ewmisfit.npy' .format(savename.split(savename.split('/')[-1])[0]), ewmisfit) self.showCube(ax[1], ewmisfit, islog=False, cmap='bwr', clim=[-4, 4]) ax[1].set_title('error-weighted misfit') if savename is not None: titles = ['dreal', 'dimag', 'ereal', 'eimag'] if separated is True: for i, subfig in enumerate(fig): subfig.savefig(savename + titles[i] + '.pdf', bbox_inches='tight') else: fig.savefig(savename + '.pdf', bbox_inches='tight') return ax
[docs] def splitModel(self, model=None): """Split model vector into d, theta and T2*.""" if model is None: model = self.model if self.mtype.lower() == 'block': nl = self.nlay if len(model) == nl and np.shape(model[0]) != (): thk = np.asarray(model[0]) wc = np.asarray(model[1]) t2 = np.asarray(model[2]) # print(thk, wc, t2) else: thk = np.asarray(model[:nl - 1]) wc = np.asarray(model[nl - 1:2 * nl - 1]) t2 = np.asarray(model[2 * nl - 1:3 * nl - 1]) else: ii = int(len(model)/2) wc = model[:ii] t2 = model[ii:] thk = np.abs(np.diff(self.kernel.zvec))[:-1] return thk, wc, t2
[docs]class MRSGenetic(MRS): """MRS class derivation using a genetic algorithm for inversion.""" def __init__(self, *args, **kwargs): super(MRS, self).__init__(*args, **kwargs)
[docs] def genMod(self, individual): """Generate (GA) model from random vector (0-1) using model bounds.""" model = np.asarray(individual) * (self.lUB - self.lLB) + self.lLB if self.trans == 'log': return pg.exp(model) else: return model
[docs] def runEA(self, nlay=None, eatype='GA', pop_size=100, num_gen=100, runs=1, mp_num_cpus=8, **kwargs): """Run evolutionary algorithm using the inspyred library Parameters ---------- nlay : int [taken from classic fop if not given] number of layers\n pop_size : int [100] population size\n num_gen : int [100] number of generations\n runs : int [pop_size*num_gen] number of independent runs (with random population)\n eatype : string ['GA'] algorithm, choose among:\n 'GA' - Genetic Algorithm [default]\n 'SA' - Simulated Annealing\n 'DEA' - Discrete Evolutionary Algorithm\n 'PSO' - Particle Swarm Optimization\n 'ACS' - Ant Colony Strategy\n 'ES' - Evolutionary Strategy """ import inspyred import random def mygenerate(random, args): """generate a random vector of model size""" return [random.random() for i in range(nlay * 3 - 1)] def my_observer(population, num_generations, num_evaluations, args): """ print fitness over generation number """ best = min(population) print('{0:6} -- {1}'.format(num_generations, best.fitness)) @inspyred.ec.evaluators.evaluator def datafit(individual, args): """ error-weighted data misfit as basis for evaluating fitness """ misfit = (self.data - self.f.response(self.genMod(individual))) / \ self.error return np.mean(misfit**2) # prepare forward operator if self.f is None or (nlay is not None and nlay is not self.nlay): self.createFOP(nlay) lowerBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1, self.thkBounds[1]), pg.Vector(self.nlay, self.wcBounds[1])), pg.Vector(self.nlay, self.t2Bounds[1])) upperBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1, self.thkBounds[2]), pg.Vector(self.nlay, self.wcBounds[2])), pg.Vector(self.nlay, self.t2Bounds[2])) if self.trans == 'log': self.lLB, self.lUB = pg.log(lowerBound), pg.log( upperBound) # ready mapping functions else: self.lLB, self.lUB = lowerBound, upperBound # self.f = MRS1dBlockQTModelling(nlay, self.K, self.z, self.t) # setup random generator rand = random.Random() # choose among different evolution algorithms if eatype == 'GA': ea = inspyred.ec.GA(rand) ea.variator = [ inspyred.ec.variators.blend_crossover, inspyred.ec.variators.gaussian_mutation] ea.selector = inspyred.ec.selectors.tournament_selection ea.replacer = inspyred.ec.replacers.generational_replacement if eatype == 'SA': ea = inspyred.ec.SA(rand) if eatype == 'DEA': ea = inspyred.ec.DEA(rand) if eatype == 'PSO': ea = inspyred.swarm.PSO(rand) if eatype == 'ACS': ea = inspyred.swarm.ACS(rand, []) if eatype == 'ES': ea = inspyred.ec.ES(rand) ea.terminator = [inspyred.ec.terminators.evaluation_termination, inspyred.ec.terminators.diversity_termination] else: ea.terminator = inspyred.ec.terminators.evaluation_termination # ea.observer = my_observer ea.observer = [ inspyred.ec.observers.stats_observer, inspyred.ec.observers.file_observer] tstr = '{0}'.format(time.strftime('%y%m%d-%H%M%S')) self.EAstatfile = self.basename + '-' + eatype + 'stat' + tstr + '.csv' with open(self.EAstatfile, 'w') as fid: self.pop = [] for i in range(runs): rand.seed(int(time.time())) self.pop.extend(ea.evolve( evaluator=datafit, generator=mygenerate, maximize=False, pop_size=pop_size, max_evaluations=pop_size*num_gen, bounder=inspyred.ec.Bounder(0., 1.), num_elites=1, statistics_file=fid, **kwargs)) # self.pop.extend(ea.evolve( # generator=mygenerate, maximize=False, # evaluator=inspyred.ec.evaluators.parallel_evaluation_mp, # mp_evaluator=datafit, mp_num_cpus=mp_num_cpus, # pop_size=pop_size, max_evaluations=pop_size*num_gen, # bounder=inspyred.ec.Bounder(0., 1.), num_elites=1, # statistics_file=fid, **kwargs)) self.pop.sort(reverse=True) self.fits = [ind.fitness for ind in self.pop] print('minimum fitness of ' + str(min(self.fits)))
[docs] def plotPopulation(self, maxfitness=None, fitratio=1.05, savefile=True): """Plot fittest individuals (fitness<maxfitness) as 1d models Parameters ---------- maxfitness : float maximum fitness value (absolute) OR fitratio : float [1.05] maximum ratio to minimum fitness """ if maxfitness is None: maxfitness = min(self.fits) * fitratio fig, ax = plt.subplots(1, 2, sharey=True) maxz = 0 for ind in self.pop: if ind.fitness < maxfitness: model = np.asarray(self.genMod(ind.candidate)) thk = model[:self.nlay - 1] wc = model[self.nlay - 1:self.nlay * 2 - 1] t2 = model[self.nlay * 2 - 1:] drawModel1D(ax[0], thk, wc * 100, color='grey') drawModel1D(ax[1], thk, t2 * 1000, color='grey') maxz = max(maxz, sum(thk)) model = np.asarray(self.genMod(self.pop[0].candidate)) thk = model[:self.nlay - 1] wc = model[self.nlay - 1:self.nlay * 2 - 1] t2 = model[self.nlay * 2 - 1:] drawModel1D(ax[0], thk, wc * 100, color='black', linewidth=3) drawModel1D(ax[1], thk, t2 * 1000, color='black', linewidth=3, plotfunction='semilogx') ax[0].set_xlim(self.wcBounds[1] * 100, self.wcBounds[2] * 100) ax[0].set_ylim((maxz * 1.2, 0)) ax[1].set_xlim(self.t2Bounds[1] * 1000, self.t2Bounds[2] * 1000) ax[1].set_ylim((maxz * 1.2, 0)) xt = [10, 20, 50, 100, 200, 500, 1000] ax[1].set_xticks(xt) ax[1].set_xticklabels([str(xti) for xti in xt]) if savefile: fig.savefig(self.EAstatfile.replace('.csv', '.pdf'), bbox_inches='tight') plt.show()
[docs] def plotEAstatistics(self, fname=None): """Plot EA statistics (best, worst, ...) over time.""" if fname is None: fname = self.EAstatfile gen, psize, worst, best, med, avg, std = np.genfromtxt( fname, unpack=True, usecols=range(7), delimiter=',') stderr = std / np.sqrt(psize) data = [avg, med, best, worst] colors = ['black', 'blue', 'green', 'red'] labels = ['average', 'median', 'best', 'worst'] fig, ax = plt.subplots() ax.errorbar(gen, avg, stderr, color=colors[0], label=labels[0]) ax.set_yscale('log') for d, col, lab in zip(data[1:], colors[1:], labels[1:]): ax.plot(gen, d, color=col, label=lab) ax.fill_between(gen, data[2], data[3], color='#e6f2e6') ax.grid(True) ymin = min([min(d) for d in data]) ymax = max([max(d) for d in data]) yrange = ymax - ymin ax.set_ylim((ymin - 0.1*yrange, ymax + 0.1*yrange)) ax.legend(loc='upper left') # , prop=prop) ax.set_xlabel('Generation') ax.set_ylabel('Fitness')
[docs]def showErrorBars(ax, thk, val, thkL, thkU, valL, valU, *args, **kwargs): """Plot wc and t2 models with error bars.""" zb = np.cumsum(thk) zm = np.hstack((zb - thk / 2, zb[-1] * 1.2)) # zb[-1]+thk[-1]/2)) valm = (val[:-1] + val[1:]) / 2 xerr = [val - valL, valU - val] yerr = [thk - thkL, thkU - thk] ax.errorbar(val, zm, fmt='.', xerr=xerr, ecolor='r', **kwargs) ax.errorbar(valm, zb, fmt='.', yerr=yerr, ecolor='g', **kwargs) ax.set_ylim(bottom=zm[-1] * 1.02, top=0) return ax
[docs]def showWC(ax, thk, wc, maxdep=0., dw=0.1, label=None, color='g'): """Show water content function nicely.""" if label is None: label = 'inversion' wmin = 0 wmax = np.max(wc) drawModel1D(ax, thk, wc, xlabel=r'$\theta$ [1]', label=label, color=color) if maxdep > 0.: ax.set_ylim(maxdep, 0.) wt = np.arange(wmin, wmax, dw) ax.set_xticks(wt) ax.set_xticklabels([f'{wi:.2f}' for wi in wt]) ax.legend(loc='best') return ax
[docs]def showT2(ax, thk, t2, maxdep=0., label=None, color='g'): """Show T2 function nicely.""" if label is None: label = 'inversion' drawModel1D(ax, thk, t2*1e3, xlabel=r'$T_2^*$ [ms]', label=label, color=color) tmin = min(20, min(t2) * 0.9e3) tmax = max(500, max(t2) * 1.1e3) ax.set_xlim(tmin, tmax) if maxdep > 0.: ax.set_ylim(maxdep, 0.) xt = [20, 50, 100, 200, 500] ax.set_xticks(xt) ax.set_xticklabels([str(ai) for ai in xt]) ax.legend(loc='best') return ax
# The End