"""
Part of comet/snmr/modelling
Magnetic resonance sounding module.
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# general modules to import according to standards
import time
import numpy as np
import matplotlib.pyplot as plt
import pygimli as pg
from pygimli.utils import iterateBounds
try: # pg.__version__ >= 1.1
from pygimli.viewer.mpl import drawModel1D
except ImportError: # pg.__version__ < 1.1
from pygimli.mplviewer import drawModel1D
from comet.snmr import kernel as k
# local functions in package
# from . snmrModelling import MRS1dBlockQTModelling, SNMRModelling
from . nmr_base import SNMRBase
[docs]class MRS(SNMRBase):
"""
Magnetic resonance sounding (MRS) manager class.
"""
def __init__(self,
survey=None,
fid=0,
kernel=None,
mtype='smooth',
dtype='rotatedAmplitudes',
**kwargs):
"""
Parameters
----------
kernel: pg.Matrix or ndarray or class-instance or string
Basically a simple precalculated kernel matrix in the .mrsi, .mrsd
or .npz format is sufficient for this class to work. If a
recalcualtion of the kernelmatrix is intended, an instance of a
kernelclass forward operator can be loaded.
gates: np.array or pg.Vector
Gates are the time discretisation of the NMR measurement, they
result in the preprocessing of the measured data, where the gating
is used to improve the signal-to-noise ratio. Data and Error Cubes
can be loaded seperately using the method 'loadDataAndError',
or a new set of synthetic data can be calcualted.
"""
super(MRS, self).__init__(
survey=survey, fid=fid, kernel=kernel, mtype=mtype,
dtype=dtype, dim=1, **kwargs)
[docs] def showDataAndError(self, ax=None, figsize=(10, 8), as_log=False):
"""Show data cube along with error cube."""
if ax is None:
fig, ax = plt.subplots(1, 2, figsize=figsize)
self.showCube(ax[0], self.data * 1e9, islog=as_log)
self.showCube(ax[1], self.error * 1e9, islog=as_log)
return ax
[docs] def showKernel(self, ax=None, save=None, **kwargs):
"""
Show the kernel as matrix (Q over z). If Kernel is a class object,
the plotting order is redirected to Kernel.show(**kwargs)
To see more about the plotting options type this in the console:
>>> import kernel as k
>>> help(k.Kernel.show)
"""
if self.Kernel is not None:
self.Kernel.show(ax=ax, **kwargs)
if save is not None:
plt.savefig(save, bbox_inches='tight')
else:
if ax is None:
fig, ax = plt.subplots()
ax.imshow(np.absolute(self.K).T, interpolation='nearest',
aspect='auto')
yt = ax.get_yticks()
maxzi = self.K.shape[1]
yt = yt[(yt >= 0) & (yt < maxzi)]
if yt[-1] < maxzi-2:
yt = np.hstack((yt, maxzi))
ytl = [str(self.kernel.zvec[int(yti)]) for yti in yt]
zl = self.kernel.zvec[[int(yti) for yti in yt]]
ytl = [str(zi) for zi in np.round(zl, 1)]
ax.set_yticks(yt)
ax.set_yticklabels(ytl)
xt = ax.get_xticks()
maxqi = self.K.shape[0]
xt = xt[(xt >= 0) & (xt < maxqi)]
xtl = [np.round(self.pulses[int(iq)], 2) for iq in xt]
ax.set_xticks(xt)
ax.set_xticklabels(xtl)
return ax
[docs] def searchForLambda(self, startLam=20000):
"""
Runs several inversion runs to find the highest lambda which is able to
fit the data within its errors.
"""
self.invert(lam=startLam, runChi1=True)
Lam = self.inv.getLambda()
factor = 1.05
# stepwise search for optimized lam
for i in range(15):
nextLam = Lam * factor
self.invert(lam=nextLam, runChi1=True)
Lam = self.inv.getLambda()
if Lam > nextLam/factor:
continue
else:
break
return nextLam
[docs] def showResult(self, figsize=(10, 8), save='',
fig=None, ax=None, syn=None, wclabel=None, t2label=None,
color=None):
"""Show theta(z) and T2*(z) (+uncertainties if there)."""
if ax is None:
fig, ax = plt.subplots(1, 2, sharey=True, figsize=figsize)
if self.inv is not None:
thk, wc, t2 = self.splitModel(model=self.inv.model())
else:
thk, wc, t2 = self.splitModel()
if syn is not None:
drawModel1D(ax[0], syn[0], syn[1], color=color, label='synthetic')
synT2 = np.array(syn[2])*1000.0
drawModel1D(ax[1], syn[0], synT2, color=color,
label='synthetic')
showWC(ax[0], thk, wc, color=color, label=wclabel)
showT2(ax[1], thk, t2, color=color, label=t2label)
if self.modelL is not None and self.modelU is not None:
thkL, wcL, t2L = self.splitModel(self.modelL)
thkU, wcU, t2U = self.splitModel(self.modelU)
showErrorBars(ax[0], thk, wc, thkL, thkU, wcL, wcU)
showErrorBars(ax[1], thk, t2*1e3,
thkL, thkU, t2L*1e3, t2U*1e3)
if fig is not None:
if save:
fig.savefig(save, bbox_inches='tight')
return ax
[docs] def showResultAndFit(self, figsize=(12, 10), save='',
maxdep=0., clim=None, suptitle=None, syn=None,
wclabel=None, t2label=None):
"""Show ec(z), T2*(z), data and model response."""
fig, ax = plt.subplots(2, 2, figsize=figsize)
if self.inv is not None:
thk, wc, t2 = self.splitModel(model=self.inv.model())
else:
thk, wc, t2 = self.splitModel()
if syn is not None:
drawModel1D(ax[0, 0], syn[0], syn[1], color='b', label='synthetic')
synT2 = np.array(syn[2]) * 1000.0
drawModel1D(ax[0, 1], syn[0], synT2, color='b',
label='synthetic')
if wclabel is None:
wclabel = r'$\lambda$={:2.2f}'.format(self.inv.getLambda())
if t2label is None:
t2label = r'$\lambda$={:2.2f}'.format(self.inv.getLambda())
showWC(ax[0, 0], thk, wc, label=wclabel)
showT2(ax[0, 1], thk, t2, label=t2label)
ax[0, 0].set_title(r'water content $\theta$')
ax[0, 1].set_title(r'decay time $T_2^*$')
ax[0, 0].set_ylabel('$z$ [m]')
ax[0, 1].set_ylabel('$z$ [m]')
ax[0, 0].legend(loc='best')
ax[0, 1].legend(loc='best')
if self.modelL is not None and self.modelU is not None:
thkL, wcL, t2L = self.splitModel(self.modelL)
thkU, wcU, t2U = self.splitModel(self.modelU)
showErrorBars(ax[0, 0], thk, wc, thkL, thkU, wcL, wcU)
showErrorBars(ax[0, 1], thk, t2*1e3,
thkL, thkU, t2L*1e3, t2U*1e3)
lim = [np.max([np.max(maxdep),
np.abs(self.kernel.zvec[-1])]), 0.]
ax[0, 0].set_ylim(lim)
ax[0, 1].set_ylim(lim)
if self.inv is not None:
resp = self.inv.response()
if len(resp) == 0: # True if for example just imported from .mrsi
resp = self.data - self.error
print('Attention: plotted simulated data are calculated as \
difference between data and error due to simple import of inversion results!')
ewmisfit = (self.data - resp) / self.error
len_real = len(self.fid.gates) * len(self.fid.pulses)
iscomplex = len(ewmisfit) == 2 * len_real
if iscomplex:
self.showCube(ax[1, 0], ewmisfit[:len_real], islog=False,
cmap='bwr', clim=[-4, 4])
self.showCube(ax[1, 1], ewmisfit[len_real:], islog=False,
cmap='bwr', clim=[-4, 4])
else:
subax = plt.subplot(2, 1, 2)
self.showCube(subax, ewmisfit, islog=False, cmap='bwr',
clim=[-4, 4])
title = 'error-weighted misfit (amp)'
if self.inv is not None:
chi2 = self.inv.getChi2()
title += ' chi² = {:.3f}'.format(chi2)
subax.set_title(title)
if suptitle:
fig.suptitle(suptitle)
if save:
fig.savefig(save, bbox_inches='tight')
return ax
[docs] def showDataAndFit(self, compare_to=None, figsize=(8, 6), savename=None,
clim=None, suptitle=None, separated=False,
savematrices=False):
"""data and error weighted misfit. 1,1 or 2,2 for complex"""
Complex = self.dtype == 'complex'
if self.inv is None:
raise Exception('Cannot show data and fit without inversion '
'instance.')
if separated is False:
fig, ax = plt.subplots(1 + Complex, 2, figsize=figsize,
sharey=True, sharex=True)
else:
fig1, ax1 = plt.subplots(1, 1, figsize=figsize)
fig2, ax2 = plt.subplots(1, 1, figsize=figsize)
fig3, ax3 = plt.subplots(1, 1, figsize=figsize)
fig4, ax4 = plt.subplots(1, 1, figsize=figsize)
ax = np.array([[ax1, ax2], [ax3, ax4]])
fig = [fig1, fig2, fig3, fig4]
if compare_to is None:
response = self.inv.response()
else:
response = compare_to
if Complex is True:
datareal, dataimag = np.split(self.data, 2)
self.showCube(ax[0, 0], datareal * 1e9,
islog=False, clim=clim, cmap='bwr')
ax[0, 0].set_title('measured data real [nV]') # log10
# ax[0, 0].set_xlabel('')
self.showCube(ax[0, 1], dataimag * 1e9,
islog=False, clim=clim, cmap='bwr')
ax[0, 1].set_title('measured data imag [nV]') # log10
# ax[0, 1].set_ylabel('')
# ax[0, 1].set_xlabel('')
ewmisfit = (self.data - response) / self.error
errorreal, errorimag = np.split(ewmisfit, 2)
self.showCube(ax[1, 0], errorreal, islog=False, cmap='bwr',
clim=[-4, 4])
ax[1, 0].set_title('error-weighted misfit real')
self.showCube(ax[1, 1], errorimag, islog=False, cmap='bwr',
clim=[-4, 4])
ax[1, 1].set_title('error-weighted misfit imag')
# ax[1, 1].set_ylabel('')
# if suptitle:
# fig.suptitle(suptitle, size=36)
else:
self.showCube(ax[0], self.data * 1e9,
islog=False, clim=clim)
ax[0].set_title('measured data [nV]') # log10
ewmisfit = (self.data - response) / self.error
if savename is not None:
np.save('{}/mrs_data.npy'
.format(savename.split(savename.split('/')[-1])[0]),
self.data)
np.save('{}/mrs_ewmisfit.npy'
.format(savename.split(savename.split('/')[-1])[0]),
ewmisfit)
self.showCube(ax[1], ewmisfit, islog=False, cmap='bwr',
clim=[-4, 4])
ax[1].set_title('error-weighted misfit')
if savename is not None:
titles = ['dreal', 'dimag', 'ereal', 'eimag']
if separated is True:
for i, subfig in enumerate(fig):
subfig.savefig(savename + titles[i] + '.pdf',
bbox_inches='tight')
else:
fig.savefig(savename + '.pdf', bbox_inches='tight')
return ax
[docs] def splitModel(self, model=None):
"""Split model vector into d, theta and T2*."""
if model is None:
model = self.model
if self.mtype.lower() == 'block':
nl = self.nlay
if len(model) == nl and np.shape(model[0]) != ():
thk = np.asarray(model[0])
wc = np.asarray(model[1])
t2 = np.asarray(model[2])
# print(thk, wc, t2)
else:
thk = np.asarray(model[:nl - 1])
wc = np.asarray(model[nl - 1:2 * nl - 1])
t2 = np.asarray(model[2 * nl - 1:3 * nl - 1])
else:
ii = int(len(model)/2)
wc = model[:ii]
t2 = model[ii:]
thk = np.abs(np.diff(self.kernel.zvec))[:-1]
return thk, wc, t2
[docs]class MRSGenetic(MRS):
"""MRS class derivation using a genetic algorithm for inversion."""
def __init__(self, *args, **kwargs):
super(MRS, self).__init__(*args, **kwargs)
[docs] def genMod(self, individual):
"""Generate (GA) model from random vector (0-1) using model bounds."""
model = np.asarray(individual) * (self.lUB - self.lLB) + self.lLB
if self.trans == 'log':
return pg.exp(model)
else:
return model
[docs] def runEA(self, nlay=None, eatype='GA', pop_size=100, num_gen=100,
runs=1, mp_num_cpus=8, **kwargs):
"""Run evolutionary algorithm using the inspyred library
Parameters
----------
nlay : int [taken from classic fop if not given]
number of layers\n
pop_size : int [100]
population size\n
num_gen : int [100]
number of generations\n
runs : int [pop_size*num_gen]
number of independent runs (with random population)\n
eatype : string ['GA']
algorithm, choose among:\n
'GA' - Genetic Algorithm [default]\n
'SA' - Simulated Annealing\n
'DEA' - Discrete Evolutionary Algorithm\n
'PSO' - Particle Swarm Optimization\n
'ACS' - Ant Colony Strategy\n
'ES' - Evolutionary Strategy
"""
import inspyred
import random
def mygenerate(random, args):
"""generate a random vector of model size"""
return [random.random() for i in range(nlay * 3 - 1)]
def my_observer(population, num_generations, num_evaluations, args):
""" print fitness over generation number """
best = min(population)
print('{0:6} -- {1}'.format(num_generations, best.fitness))
@inspyred.ec.evaluators.evaluator
def datafit(individual, args):
""" error-weighted data misfit as basis for evaluating fitness """
misfit = (self.data - self.f.response(self.genMod(individual))) / \
self.error
return np.mean(misfit**2)
# prepare forward operator
if self.f is None or (nlay is not None and nlay is not self.nlay):
self.createFOP(nlay)
lowerBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1,
self.thkBounds[1]),
pg.Vector(self.nlay, self.wcBounds[1])),
pg.Vector(self.nlay, self.t2Bounds[1]))
upperBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1,
self.thkBounds[2]),
pg.Vector(self.nlay, self.wcBounds[2])),
pg.Vector(self.nlay, self.t2Bounds[2]))
if self.trans == 'log':
self.lLB, self.lUB = pg.log(lowerBound), pg.log(
upperBound) # ready mapping functions
else:
self.lLB, self.lUB = lowerBound, upperBound
# self.f = MRS1dBlockQTModelling(nlay, self.K, self.z, self.t)
# setup random generator
rand = random.Random()
# choose among different evolution algorithms
if eatype == 'GA':
ea = inspyred.ec.GA(rand)
ea.variator = [
inspyred.ec.variators.blend_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.selector = inspyred.ec.selectors.tournament_selection
ea.replacer = inspyred.ec.replacers.generational_replacement
if eatype == 'SA':
ea = inspyred.ec.SA(rand)
if eatype == 'DEA':
ea = inspyred.ec.DEA(rand)
if eatype == 'PSO':
ea = inspyred.swarm.PSO(rand)
if eatype == 'ACS':
ea = inspyred.swarm.ACS(rand, [])
if eatype == 'ES':
ea = inspyred.ec.ES(rand)
ea.terminator = [inspyred.ec.terminators.evaluation_termination,
inspyred.ec.terminators.diversity_termination]
else:
ea.terminator = inspyred.ec.terminators.evaluation_termination
# ea.observer = my_observer
ea.observer = [
inspyred.ec.observers.stats_observer,
inspyred.ec.observers.file_observer]
tstr = '{0}'.format(time.strftime('%y%m%d-%H%M%S'))
self.EAstatfile = self.basename + '-' + eatype + 'stat' + tstr + '.csv'
with open(self.EAstatfile, 'w') as fid:
self.pop = []
for i in range(runs):
rand.seed(int(time.time()))
self.pop.extend(ea.evolve(
evaluator=datafit, generator=mygenerate, maximize=False,
pop_size=pop_size, max_evaluations=pop_size*num_gen,
bounder=inspyred.ec.Bounder(0., 1.), num_elites=1,
statistics_file=fid, **kwargs))
# self.pop.extend(ea.evolve(
# generator=mygenerate, maximize=False,
# evaluator=inspyred.ec.evaluators.parallel_evaluation_mp,
# mp_evaluator=datafit, mp_num_cpus=mp_num_cpus,
# pop_size=pop_size, max_evaluations=pop_size*num_gen,
# bounder=inspyred.ec.Bounder(0., 1.), num_elites=1,
# statistics_file=fid, **kwargs))
self.pop.sort(reverse=True)
self.fits = [ind.fitness for ind in self.pop]
print('minimum fitness of ' + str(min(self.fits)))
[docs] def plotPopulation(self, maxfitness=None, fitratio=1.05, savefile=True):
"""Plot fittest individuals (fitness<maxfitness) as 1d models
Parameters
----------
maxfitness : float
maximum fitness value (absolute) OR
fitratio : float [1.05]
maximum ratio to minimum fitness
"""
if maxfitness is None:
maxfitness = min(self.fits) * fitratio
fig, ax = plt.subplots(1, 2, sharey=True)
maxz = 0
for ind in self.pop:
if ind.fitness < maxfitness:
model = np.asarray(self.genMod(ind.candidate))
thk = model[:self.nlay - 1]
wc = model[self.nlay - 1:self.nlay * 2 - 1]
t2 = model[self.nlay * 2 - 1:]
drawModel1D(ax[0], thk, wc * 100, color='grey')
drawModel1D(ax[1], thk, t2 * 1000, color='grey')
maxz = max(maxz, sum(thk))
model = np.asarray(self.genMod(self.pop[0].candidate))
thk = model[:self.nlay - 1]
wc = model[self.nlay - 1:self.nlay * 2 - 1]
t2 = model[self.nlay * 2 - 1:]
drawModel1D(ax[0], thk, wc * 100, color='black', linewidth=3)
drawModel1D(ax[1], thk, t2 * 1000, color='black', linewidth=3,
plotfunction='semilogx')
ax[0].set_xlim(self.wcBounds[1] * 100, self.wcBounds[2] * 100)
ax[0].set_ylim((maxz * 1.2, 0))
ax[1].set_xlim(self.t2Bounds[1] * 1000, self.t2Bounds[2] * 1000)
ax[1].set_ylim((maxz * 1.2, 0))
xt = [10, 20, 50, 100, 200, 500, 1000]
ax[1].set_xticks(xt)
ax[1].set_xticklabels([str(xti) for xti in xt])
if savefile:
fig.savefig(self.EAstatfile.replace('.csv', '.pdf'),
bbox_inches='tight')
plt.show()
[docs] def plotEAstatistics(self, fname=None):
"""Plot EA statistics (best, worst, ...) over time."""
if fname is None:
fname = self.EAstatfile
gen, psize, worst, best, med, avg, std = np.genfromtxt(
fname, unpack=True, usecols=range(7), delimiter=',')
stderr = std / np.sqrt(psize)
data = [avg, med, best, worst]
colors = ['black', 'blue', 'green', 'red']
labels = ['average', 'median', 'best', 'worst']
fig, ax = plt.subplots()
ax.errorbar(gen, avg, stderr, color=colors[0], label=labels[0])
ax.set_yscale('log')
for d, col, lab in zip(data[1:], colors[1:], labels[1:]):
ax.plot(gen, d, color=col, label=lab)
ax.fill_between(gen, data[2], data[3], color='#e6f2e6')
ax.grid(True)
ymin = min([min(d) for d in data])
ymax = max([max(d) for d in data])
yrange = ymax - ymin
ax.set_ylim((ymin - 0.1*yrange, ymax + 0.1*yrange))
ax.legend(loc='upper left') # , prop=prop)
ax.set_xlabel('Generation')
ax.set_ylabel('Fitness')
[docs]def showErrorBars(ax, thk, val, thkL, thkU, valL, valU, *args, **kwargs):
"""Plot wc and t2 models with error bars."""
zb = np.cumsum(thk)
zm = np.hstack((zb - thk / 2, zb[-1] * 1.2)) # zb[-1]+thk[-1]/2))
valm = (val[:-1] + val[1:]) / 2
xerr = [val - valL, valU - val]
yerr = [thk - thkL, thkU - thk]
ax.errorbar(val, zm, fmt='.', xerr=xerr, ecolor='r', **kwargs)
ax.errorbar(valm, zb, fmt='.', yerr=yerr, ecolor='g', **kwargs)
ax.set_ylim(bottom=zm[-1] * 1.02, top=0)
return ax
[docs]def showWC(ax, thk, wc, maxdep=0., dw=0.1, label=None, color='g'):
"""Show water content function nicely."""
if label is None:
label = 'inversion'
wmin = 0
wmax = np.max(wc)
drawModel1D(ax, thk, wc, xlabel=r'$\theta$ [1]',
label=label, color=color)
if maxdep > 0.:
ax.set_ylim(maxdep, 0.)
wt = np.arange(wmin, wmax, dw)
ax.set_xticks(wt)
ax.set_xticklabels([f'{wi:.2f}' for wi in wt])
ax.legend(loc='best')
return ax
[docs]def showT2(ax, thk, t2, maxdep=0., label=None, color='g'):
"""Show T2 function nicely."""
if label is None:
label = 'inversion'
drawModel1D(ax, thk, t2*1e3, xlabel=r'$T_2^*$ [ms]',
label=label, color=color)
tmin = min(20, min(t2) * 0.9e3)
tmax = max(500, max(t2) * 1.1e3)
ax.set_xlim(tmin, tmax)
if maxdep > 0.:
ax.set_ylim(maxdep, 0.)
xt = [20, 50, 100, 200, 500]
ax.set_xticks(xt)
ax.set_xticklabels([str(ai) for ai in xt])
ax.legend(loc='best')
return ax
# The End