"""
Part of comet/snmr/kernel
This file contents parts of the MRSmatlab Kernel function part
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import os
import sys
import queue
import numpy as np
import pygimli as pg
from comet.pyhed import misc as cm
from comet.pyhed import log
from comet.pyhed.IO import checkDirectory, getItem, savefieldvtk
from comet.pyhed.loop import buildLoop, loadLoop, Loop
from comet.pyhed.plot import (showLoopLayout, setAxSize, cmap_phase, quantile,
getCMapAndLim)
from comet.pyhed.misc import sinhspace, progressBar
from comet.snmr.survey import Survey
from comet.snmr import misc
from pathlib import Path
import matplotlib.pyplot as plt
[docs]class Kernel(object):
"""
Basic class to solve the NMR kernel computation.
Parameters
----------
name: string [ None ]
If kernel is loaded from file.
survey: survey class instance [ None ]
Calls *setSurvey* to define underlaying survey class.
Holds important attributes like pulse moments and the loops for
tx and rx.
tx: integer [ 0 ]
Transmitter index in corresponding survey.
rx: integer [ 0 ]
Receiver index in corresponding survey.
fid: interger [ 0 ]
Sounding index in corresponding survey.
dimension: integer [1]
Defines the kernel integration.
Example
-------
>>> from comet.snmr import kernel as k
>>> from comet.snmr import survey
>>> site = survey.Survey()
>>> kernel = k.kernel(site)
>>> kernel.calculate()
>>> kernel.save('savename')
>>> kernel.show()
"""
def __init__(self, survey=None, fid=0, dimension=1, name=None):
# attributes setted from initial call
# -----------------------------------
# except for *name*
# load(name) is called after init of other attributes
# backup for pulse moments if not fid is prepared
# mainly for plotting after loading a kernel class from harddisk
self._pulses_backup = None
# survey class
self.survey = None
# index of corresponding sounding in survey class
self.fid_index = None
self.setSurvey(survey, fid=fid)
# 1D, 2D, or 3D
self.dimension = dimension
# attribute init for overview
# ---------------------------
# interpolation matrix from loop mesh to kernel mesh
self.interpolationMatrix = None
# b field copies (sometimes comes in handy)
self.txBfield = None
self.rxBfield = None
# integrated kernel matrix: shape (pulses, cells)
self.K = None
# z discretization for dim = 1
self.kernelMesh1D = None
self.min_thk = None
self._zvec = None
self._zvec_reduced = None
# y discretization for dim = 2
self.yvec = None
self.kernelMesh2D = None
# for intergration
self.kernelMeshCellVolume = None
self.kernelMeshCellCenter = None
# only used in debug mode
self.K_part1 = None
self.K_part2 = None
self.K_part3 = None
self.debug = False
# elliptical decomposition
self.txalpha = None
self.txbeta = None
self.txzeta = None
self.txperpend = None
self.rxalpha = None
self.rxbeta = None
self.rxzeta = None
self.rxperpend = None
# separate loopmesh for both loops
self.seploopmesh = None
self.slm_name = None
self.seploopmesh_dipole = None
self.slm_dipole_name = None
self.num_cpu = 12
# internal h refinement of kernelmesh
self.h_order = 0
self.h_mat = None
# rotation angle and magnitude of magnetization after pulses
self.theta = None
self.magnetization = None
# full 3D mesh for kernel (method: getMagnetizationMesh)
self.mag_mesh = None # used only for magnetization calc and display
if name is not None:
self.load(name)
[docs] def release_memory(self):
""" Calling this function is releasing some attributes that are using a
fairly big amount of memory.
Sets the following attributes back to None:
- The interpolation matrix between the loop meshes and the kernel mesh
*interpolationMatrix*
- local copies of the magnetic fields (fields in tx and rx are not
effected)
*txBfield*, *rxBfield*
- the 3D kernel mesh cell center and volumes
*kernelMeshCellVolume*, *kernelMeshCellCenter*
- the elliptical decomposition of the tx and rx bfields
*txalpha*, *txbeta*, *txzeta*, *txperpend*, *rxalpha*, *rxbeta*,
*rxzeta*, *rxperpend*
Note: a recalculation of the kernel will take about the same amount of
time as the first call, as all cached variables are gone, however apart
from a recalculation, the other purposes of the kernel class (export,
figures, inversion(without recalculation)) are not effected.
Another note: If you want to use this method only for saving disk space
in case you save the kernel class, then you might consider the *light*
flag of the *.save* method instead.
"""
self.interpolationMatrix = None
self.txBfield = None
self.rxBfield = None
self.kernelMeshCellVolume = None
self.kernelMeshCellCenter = None
self.txalpha = None
self.txbeta = None
self.txzeta = None
self.txperpend = None
self.rxalpha = None
self.rxbeta = None
self.rxzeta = None
self.rxperpend = None
def __str__(self):
str_ = f'survey: {self.survey!r}\n'\
f'fid: {self.fid!r}\n'\
f'earth: {self.survey.earth!r}\n'\
f'Tx: {self.tx!r}\n'\
if self.tx is not None:
str_ += f'{self.tx.config!r}\n'\
if self.coincident:
str_ += 'Rx: coincident\n'
else:
str_ += 'Rx: {0.rx!r}\n'\
'{0.rx.config!r}\n'.format(self)
str_ += 'dimension: {0.dimension}'.format(self)
if self.dimension == 1:
if self.K is not None:
str_ += '\nKernel Matrix pulse Moments: {0[0]} \nKernel \
Matrix layers: {0[1]}'.format(np.shape(self.K))
else:
str_ += r'\nKernel Matrix not calculated yet.'
return str_
def __repr__(self):
str_ = 'kernel: {0.dimension}D, {1}'.format(
self, 'coincident' if self.coincident else 'separated')
return str_
@property
def larmor(self):
""" Larmor frequency [Hz] from earth defined in survey."""
larmor = self.survey.earth.larmor
return larmor
@property
def magnetization_magnitude(self):
magmag = self.fid.curie * misc.constants.gamma *\
(self.survey.earth.magnitude**2)
return magmag
@property
def tx_area(self):
""" Area of the transmitter loop."""
try:
area = self.tx.area
except AttributeError:
# no tx as of yet
area = None
return area
@property
def rx_area(self):
""" Area of the receiver loop."""
try:
area = self.rx.area
except AttributeError:
# no rx as of yet
area = None
return area
@property
def tx(self):
""" Reference to transmitter class instance in survey."""
if self.tx_index is None:
tx = None
else:
try:
tx = self.survey.loops[self.tx_index]
except IndexError:
# no tx as of yet
tx = None
return tx
@property
def rx(self):
""" Reference to receiver class instance in survey."""
if self.rx_index is None:
rx = None
else:
try:
rx = self.survey.loops[self.rx_index]
except IndexError:
# no rx as of yet
rx = None
return rx
@property
def tx_index(self):
try:
index = self.fid.tx_index
except AttributeError:
# no fid as of yet
index = None
return index
@property
def rx_index(self):
try:
index = self.fid.rx_index
except AttributeError:
# no fid as of yet
index = None
return index
@property
def coincident(self):
coi = self.tx_index == self.rx_index
return coi
@property
def fid(self):
""" Reference to sounding (FID) class instance in survey."""
try:
fid = self.survey.fids[self.fid_index]
except IndexError:
# no fid as of yet
fid = None
return fid
@property
def zvec(self):
""" z discretisation """
zvec = self.getZVector(reduced=True)
return zvec
@zvec.setter
def zvec(self, vector):
""" Convinience setter. For other *min_thk* use *setZVec*. """
if self.min_thk is None:
self.min_thk = 0.5
self.setZVector(vector, min_thk=self.min_thk)
@property
def pulses(self):
""" Reference to pulse moments from sounding (FID)."""
try:
pm_vec = self.fid.pulses
if self._pulses_backup is not None:
self._pulses_backup = None
except AttributeError:
# no fid as of yet
pm_vec = self._pulses_backup
if pm_vec is not None:
log.debug('kernel.pulses: from backup')
return pm_vec
[docs] def setPulsesDirectly(self, pulses):
""" Set pulse moment vector manually if not supported by survey + fid.
(This is called when loading a kernel from the harddisk, mainly for
plotting reasons). For all calculation purposes a survey and fid class
is recommended.
"""
if pulses is not None:
self._pulses_backup = np.array(pulses)
def _updatePulses(self, disable_warning=False):
""" Overrides the pulse moment vector of the kernel class with pulses
found in fid.
(Usually only called after setting a survey class in a loaded kernel.)
Internal function, do not call from outside.
If you are not sure if the kernel class has pulses, check the *.fid*
or *.pulses*, otherwise load a kernel from harddisk or provide a survey
class via *setSurvey*.
If you change the pulses in the *fid*, the kernel recognizes them as
changed without calling this function (which is the way the user
controls the used pulse moments).
"""
if not disable_warning and self._pulses_backup is not None and not \
np.allclose(self._pulses_backup, self.pulses):
log.warning(
'Overriding pulse moments ({}) with new pulse moment vector '
'from fid ({}).'.format(self._pulses_backup,
self.pulses))
self._checkPulses()
# fid holds pulse, setting backup back to None to avoid interaction
# (self._pulses_backup is only used after loading kernel from harddisk
# without fid class instance provided)
self._pulses_backup = None
@property
def shape(self):
if self.dimension == 1:
m_dim = len(self.zvec) - 1
else:
if self.kernelMesh2D is not None:
m_dim = self.kernelMesh2D.cellCount()
else:
m_dim = 0
if self.pulses is None:
npulses = 0
else:
npulses = len(self.pulses)
# so (0, 0) indicates: no pulses, no mesh
return (npulses, m_dim)
@property
def updatable(self):
up = self.rx is not None and self.tx is not None
up = up and self.pulses is not None
return up
[docs] def setDebug(self, debug: bool):
self.debug = debug
[docs] def setSurvey(self, survey, fid=0):
""" Sets survey class containing necessary information for the kernel.
Parameters
----------
survey: comet.snmr.survey.Survey or None
Sets given survey class instance or create empty class instance.
fid: integer [ 0 ]
Index of corresponding sounding in the survey.
"""
if survey is None:
self.survey = Survey()
else:
self.survey = survey
self.fid_index = fid
if self.fid is not None:
self._updatePulses()
[docs] def setTx(self, tx, **kwargs):
""" Sets initialized loop or pipe arg and kwargs to *loadLoop*. """
if isinstance(tx, str):
self.tx = loadLoop(tx, **kwargs)
else:
self.tx = tx
[docs] def setRx(self, rx, **kwargs):
""" Sets initialized loop or pipe arg and kwargs to *loadLoop*. """
if isinstance(rx, str):
self.rx = loadLoop(rx, **kwargs)
else:
self.rx = rx
[docs] def setModel(self, *args, **kwargs):
""" Pipes args and kwargs to *self.tx.setModel*. Same for rx."""
self.tx.setModel(*args, **kwargs)
if self.rx is not None:
self.rx.setModel(*args, **kwargs)
def _checkSrc(self):
if self.tx is None:
raise Exception('No transmitter found.')
elif self.tx.type == 'Dummy':
raise Exception('Only dummy transmitter found.')
if self.rx is None:
raise Exception('No receiver found.')
elif self.rx.type == 'Dummy':
raise Exception('Only dummy receiver found.')
def _checkPulses(self):
if self.fid is None:
raise Exception(
f'No sounding ({self.fid_index}) found in {self.survey}')
if self.pulses is None:
raise Exception(
f'No pulses found in {self.fid}')
[docs] def createZVector(self, numz, minz, min_thk=0.5):
"""
Creates a sinus hyperbolicus shaped Z discretisation in numz
steps between 0 and minz.
"""
vector = np.sinh(
np.linspace(0.0, np.arcsinh(-np.abs(minz)), numz + 1))
self.min_thk = min_thk
self.setZVector(vector, min_thk=self.min_thk)
[docs] def getZVector(self, reduced=True):
if reduced is True:
return self._zvec_reduced
else:
return self._zvec
[docs] def setZVector(self, vector, min_thk=0.5):
""" Defines the attribute zvec.
Sets the given vector as z discretization. Attention: the value for
min_thk defines the minimum thickness of the discretization used in the
end. For all thicknesses in vector smaller than min_thk, the Kernel is
integrated to match the min_thk. For calulation of the kernel function
the original given vector is used.
Parameters
----------
vector: array_like
Z discretization in m to be used for the kernel calculation. If a
new vector is to be created, please also take a look at the method
*createZVector*.
min_thk: float
Minimum thickness te kernel and zvec is integrated if returned.
This leads to higher accuracy in the vicinity of the loop.
"""
if self.dimension != 1:
log.warning('Setting zvec for kernel of Dim = {} is useless.'
.format(self.dimension))
self._zvec = np.array(vector)
self.min_thk = min_thk
if self._zvec is not None:
a, b = cm.vec.cumsumDepth(self._zvec,
min_thk=self.min_thk)
self.z_index, self._zvec_reduced = a, b
else:
self.z_index = None
self._zvec_reduced = None
[docs] def setYVector(self, vector):
if self.dimension != 2:
log.warning('Setting yvec for kernel of Dim = {} is useless.'
.format(self.dimension))
self.yvec = np.array(vector)
[docs] def getSliceCoords(self):
"""
Returns input coordinates for custEM Slice interpolation of magnetic
fields to the kernel slices.
"""
slice_coords = self.kernelMeshCellCenter.T.reshape(
(len(self.yvec) - 1), -1, 3)
slice_list = []
for subslice in slice_coords:
slice_list.append(subslice)
return slice_list
[docs] def getKernel(self, reduced=True):
if self.K is None:
kern = None
else:
if self.dimension == 1 and reduced:
kern = cm.vec.sumBetweenIndices(self.K, self.z_index, axis=1)
else:
kern = self.K
return kern
[docs] def createSeperatedLoopMesh(self, name='SepLoopMesh', dipole=True,
exportVTK=False,
refinement_para=1.0,
max_area_factor=1.0):
"""
Creates a mesh that contains the receiver and the transmitter loop.
"""
# tx and rx needed
self._checkSrc()
checkDirectory(name, filename=True)
if self.coincident:
pos = self.tx.pos
else:
pos = np.concatenate((self.tx.pos, self.rx.pos), axis=0)
pos = pg.utils.unique_rows(pos)
hlp = Loop((pos,
np.ones(len(pos)), # fake phi
np.ones(len(pos)), # fake ds
False, # grounded
'arbitrary'
))
hlp.setMeshParameters(refinement_para=refinement_para,
max_area_factor=max_area_factor)
self.seploopmesh = pg.Mesh(hlp.createLoopMesh(savename=name))
if exportVTK is True:
self.seploopmesh.exportVTK('sepLoop.vtk')
self.slm_name = (hlp.loop_mesh_name + '.')[:-1] # copy string
if dipole is True:
self.seploopmesh_dipole = \
pg.Mesh(hlp.createDipoleMesh(savename=name))
self.slm_dipole_name = (hlp.dipole_mesh_name + '.')[:-1]
if exportVTK is True:
self.seploopmesh_dipole.exportVTK('sepDipole.vtk')
[docs] def create1DKernelMesh(self,
max_length=0.1,
area=100.,
quality=32,
zvec=None,
size_factor=2.5,
z_factor=2.5,
export_xyplane=None,
max_dipoles=2000,
calc_3D_stats=True,
xmin=None,
xmax=None,
ymin=None,
ymax=None):
"""
In order to integrate the kernel to a 1D structure without
interpolation errors, a special mesh consisting of triangular zylinders
has to be defined.
Parameters
----------
max_length: float [ 0.1 ]
Defines the smallest edge length for the discretisation of the loop
. In order to get admirable kernel results a value of 0.1 meters
should be the maximum.
area: float [ 100. ]
Defines the maximum Area a triangle in the loop slice can have.
quality: float [ 32. ]
Defines the smallest angle inside a triangle. Be careful with
values above 35.
zvec: array_like [ None ]
Usualy the zvec is defined automatically, this flag gives the user
the optional possibility to give a zvec from outside the funktion.
size_factor: float [ 2.5 ]
Extension of the kernel mesh (and therefore integration volume)
in the x and y direction. Should be at least 2 times the loop
diameter or shortest edge length. This value defines the multipier.
z_factor: float [ 2.5 ]
Maximum depth of the Kernel. Should be at least 2 times the loop
diameter or shortest edge length. This value defines the multipier.
export_xyplane: string [ None ]
Filename for the resulting kernel mesh plane in 2D can be
exported for debugging or simply to check the mesh (vtk).
max_dipoles: interger [ 2000 ]
Fallback for high node density loops. This sets an overall maximum
for the number of dipoles used for the loop discretization.
However this only comes into account in rare cases.
"""
# input checks
self._checkSrc()
if self.tx.type == 'Dummy':
raise Exception('need proper Loop for creating a KernelMesh.')
# compute outer limits for loopmesh
max_length = max(max(sum(self.tx.ds), sum(self.rx.ds)) / max_dipoles,
max_length)
log.debug(f'create1DKernelMesh: wire discretization: {max_length} m')
txhlp = buildLoop(self.tx.pos, max_length=max_length)
if not self.coincident:
rxhlp = buildLoop(self.rx.pos, max_length=max_length)
pos = np.concatenate((txhlp.pos, rxhlp.pos), axis=0)
pos = pg.utils.unique_rows(pos)
maxdis = min(txhlp._maxdistance, rxhlp._maxdistance)
else:
pos = txhlp.pos
maxdis = txhlp._maxdistance
minx = np.min(pos[:, 0]) - size_factor * maxdis
maxx = np.max(pos[:, 0]) + size_factor * maxdis
miny = np.min(pos[:, 1]) - size_factor * maxdis
maxy = np.max(pos[:, 1]) + size_factor * maxdis
minz = np.min(self.tx.pos[:, 2]) - z_factor * maxdis
if xmin is not None:
minx = min(minx, xmin)
if xmax is not None:
maxx = max(maxx, xmax)
if ymin is not None:
miny = min(miny, ymin)
if ymax is not None:
maxy = max(maxy, ymax)
if len(pos) == 1: # just one dipole
# 10 * 10 unit cube around the dipole
minx = -10. + pos[0][0]
maxx = 10. + pos[0][0]
miny = -10. + pos[0][1]
maxy = 10. + pos[0][1]
minz = -10. + pos[0][2]
area2D = (maxx - minx) * (maxy - miny)
cell_aprox = int(area2D // area)
if cell_aprox > 100000:
log.warning(
'this will be a faily big 1D kernel mesh, you may consider '
'using another value for area in *create1DKernelMesh*. This '
'mesh will have roughly {} cells + source discretization.'
.format(cell_aprox))
log.debug('cells due to area parameter: at least {} / {} ~ {} cells.'
.format(area2D, area, cell_aprox))
if zvec is None:
# create new
if self.zvec is None:
if self.pulses is None:
numZ = 80
else:
numZ = len(self.pulses) * 4
self.createZVector(numZ, minz)
else:
# set given
self.setZVector(zvec)
# build kernel mesh: outer mesh
kernelpoly = pg.Mesh(2)
ob0 = kernelpoly.createNode(minx, miny, 0)
ob1 = kernelpoly.createNode(maxx, miny, 0)
ob2 = kernelpoly.createNode(maxx, maxy, 0)
ob3 = kernelpoly.createNode(minx, maxy, 0)
kernelpoly.createEdge(ob0, ob1)
kernelpoly.createEdge(ob1, ob2)
kernelpoly.createEdge(ob2, ob3)
kernelpoly.createEdge(ob3, ob0)
# build kernel mesh: sources
for position in pos:
kernelpoly.createNode(position)
# build kernel mesh: create triangle mesh
kernelmesh = pg.meshtools.createMesh(
kernelpoly, quality=quality, area=area,
smooth=[2, 1])
if export_xyplane is not None:
kernelmesh.exportVTK(export_xyplane)
self.set1DKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats)
[docs] def set1DKernelMesh(self, mesh, calc_3D_stats=True):
""" Sets the 1D kernel mesh.
Parameters
----------
mesh: stirng or pygimli.Mesh
Filename or mesh instance of a 2D mesh in the x-y plane.
Need
----
z discretization:
Can be setted via *createZVector*, *setZVector* or
direct use of *create1DKernelMesh*. However the needed information
to do that may not be available on the fly, therefore no default
z vector is created.
"""
# optionally load mesh
if isinstance(mesh, str):
mesh = pg.Mesh(mesh)
assert mesh.dim() == 2, f'need 2D Mesh, got {mesh.dim()}D Mesh'
if self.zvec is None:
raise Exception('Need z vector. Please use eiterh createZVector or'
' setZVector method.')
self.kernelMesh1D = mesh
self._evalKernelMesh(mesh, calc_3D_stats=calc_3D_stats)
def _evalKernelMesh(self, mesh, calc_3D_stats=True):
""" Internal function to evalualte the kernel mesh cell center and
volume.
Midpoints of the virtual cells (virtual because there is no actual
3D Mesh)
"""
# midpoints of the virtual cells (virtual because
# there is no actual 3D Mesh)
if self.dimension == 1:
if calc_3D_stats:
zmid = (self._zvec[:-1] + self._zvec[1:]) / 2.
zheight = -(self._zvec[1:] - self._zvec[:-1])
cc = np.array(mesh.cellCenters())
self.kernelMeshCellCenter = np.tile(cc, [len(zmid), 1]).T
self.kernelMeshCellCenter[2] = \
np.repeat(zmid, mesh.cellCount())
ca = np.array(mesh.cellSizes())
self.kernelMeshCellVolume = \
np.tile(ca, len(zheight)) * \
np.repeat(zheight, mesh.cellCount())
self.interpolationMatrix = None
elif self.dimension == 2:
if self.yvec is None or len(self.yvec) == 0:
raise Exception('Cannot calculate cell centers without y vec.')
if calc_3D_stats:
# midpoints of the virtual cells (virtual because
# there is no actual 3D Mesh)
ymid = (self.yvec[:-1] + self.yvec[1:]) / 2
# lengths of the cells
ylength = np.abs(self.yvec[1:] - self.yvec[:-1])
kernelCellCenter2D = np.array(mesh.cellCenters())
if np.allclose(kernelCellCenter2D[:, 2], 0):
kernelCellCenter2D = np.column_stack(
(kernelCellCenter2D[:, 0],
kernelCellCenter2D[:, 2],
kernelCellCenter2D[:, 1]))
self.kernelMeshCellCenter = np.tile(kernelCellCenter2D,
[len(ymid), 1]).T
self.kernelMeshCellCenter[1] = np.repeat(
ymid, mesh.cellCount())
kernelCellAreas2D = np.array(mesh.cellSizes())
self.kernelMeshCellVolume = np.tile(
kernelCellAreas2D, len(ylength)) * \
np.repeat(ylength, mesh.cellCount())
self.interpolationMatrix = None
else:
# not needed for 0D and 3D
pass
[docs] def create2DKernelMesh(self, area=15., quality=34, yvec=None, x_factor=5,
z_factor=2, savename=None, export_xzplane=None,
calc_3D_stats=True, order=0):
"""
Similary to the mesh in the 1D case a special mesh consisting of
triangluar zylinders is generated. The Zylinders are pointing in the y
direction to allow a perfect integration to the x-z plane.
Parameters
----------
area: float [15.]
Affects the maximum area a triangle in the 2D slice is allowed to
have. Higher Values lead to bigger cells.
quality: float [34]
Defines the smallest angle inside a triangle. Be careful with
values above 34.5. Higher values = more cells.
yvec: ndarray, list [None]
Usualy the y vector is defined automatically, this flag gives the
user the optional possibility to give a YVec from outside the
function.
x_factor: float [2]
Extension of the kernel mesh (and therefore integration volume)
in the x direction. Should be at least 2 times the loop
diameter or shortest edge length. This value defines the multipier.
z_factor: float [2]
Extension of the kernel mesh (and therefore integration volume)
in the z direction. Should be at least 2 times the loop
diameter or shortest edge length. This value defines the multipier.
savename: string [None]
If a savename is given, the resulting 2D Mesh is saved in the .bms
format for later use.
export_xyplane: string [ None ]
Filename for the resulting kernel mesh plane in 2D can be
exported for debugging or simply to check the mesh (vtk).
"""
# compute inner limits for kernelmesh
self._evalOuterDims()
kernelpoly = pg.meshtools.createWorld(
(self.minx, float(self.minz)),
(self.maxx, float(self.maxz)))
log.info('create2DKernelMesh: discretisation inner mesh: {:0.4f} m²'
.format(self.min_dis**2))
innermesh = pg.meshtools.createMesh(
kernelpoly,
quality=quality,
area=self.min_dis**2,
smooth=[2, 1])
# ax, cbar = pg.viewer.showMesh(innermesh,
# innermesh.cellMarkers(),
# cMap='summer',
# label='region marker')
# inner is ready
innermesh.setCellMarkers(np.ones(innermesh.cellCount(), dtype=int))
endSizeX = self.max_dis * x_factor
endSizeZ = self.max_dis * z_factor
log.debug(f'Append boundary: {endSizeX} x {endSizeZ}')
log.info(f'create2DKernelMesh: discretisation \
outer mesh: {area:0.2f} m²')
kernelmesh = pg.meshtools.appendTriangleBoundary(
innermesh,
xbound=(endSizeX - self.max_dis) / 2,
ybound=endSizeZ,
marker=2,
area=area,
quality=quality,
smooth=True,
isSubSurface=True,
addNodes=5)
# print(innermesh)
# print(kernelmesh)
# ax, cbar = pg.viewer.showMesh(kernelmesh,
# kernelmesh.cellMarkers(),
# cMap='summer',
# label='region marker')
if savename is not None:
checkDirectory(savename, filename=True)
kernelmesh.save(savename + '.bms')
print(f'2DKernelmesh saved as {savename}.bms')
if export_xzplane is not None:
checkDirectory(export_xzplane, filename=True)
kernelmesh.exportVTK(export_xzplane + '.vtk')
log.info(f'2DKernelmesh exported as {export_xzplane}.vtk')
if yvec is None:
self.createYVec()
else:
self.yvec = np.array(yvec)
self.set2DKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats,
order=order)
def _evalOuterDims(self):
""" Internal function. Calculates the outer dims of the mesh. """
self._checkSrc()
if self.coincident:
pos = self.tx.pos
self.min_dis = self.tx._mindistance
self.max_dis = self.tx._maxdistance
else:
pos = np.concatenate((self.tx.pos, self.rx.pos), axis=0)
self.min_dis = np.min([self.tx._mindistance,
self.rx._mindistance])
self.max_dis = np.min([self.tx._maxdistance,
self.rx._maxdistance])
self.minz = np.min([-self.min_dis, -0.5])
self.minx = np.min(pos[:, 0]) + self.minz
self.maxx = np.max(pos[:, 0]) - self.minz
self.miny = np.min(pos[:, 1]) + self.minz
self.maxy = np.max(pos[:, 1]) - self.minz
self.maxz = 0.
if self.tx.type == 'line' or self.rx.type == 'line':
self.minx = np.min([self.minx, self.miny])
self.miny = np.min([self.minx, self.miny])
self.maxx = np.max([self.maxx, self.maxy])
self.maxy = np.max([self.maxx, self.maxy])
if len(pos) == 1: # just one dipole
self.minx = -5. + pos[0][0]
self.maxx = 5. + pos[0][0]
self.miny = -5. + pos[0][1]
self.maxy = 5. + pos[0][1]
self.minz = -1. + pos[0][2]
[docs] def create1DInterpolationSlices(self):
coords = create1DInterpolationSlices(self)
return coords
[docs] def create2DInterpolationSlices(self):
coords = create2DInterpolationSlices(self)
return coords
[docs] def createYVec(self, max_length=0.2, max_num=300, y_factor=2.,
calc_3D_stats=True):
""" Creates the y vector discretization for the 2D kernel mesh.
The y vector represents the y values of the 3D Kernel mesh before
the integration to 2D.
Parameters
----------
max_length: float [ 0.2 ]
Maximum distance between two slices inbetween the source dipoles.
max_num: integer [ 300 ]
Maximum number of slices. Overrides
max_length if they conflict.
y_factor: float [ 2. ]
Extension of the kernel mesh (and therefore integration volume)
in the y direction. Should be at least 2 times the loop
diameter or shortest edge length. This value defines the multipier.
"""
# hyperbolic sine
self._evalOuterDims()
pos = self.tx.pos
if not self.coincident:
pos = np.concatenate((pos, self.rx.pos), axis=0)
midYVec = np.arange(self.miny, self.maxy, max_length)
if len(midYVec) > max_num//3:
midYVec = np.linspace(self.miny, self.maxy, max_num//3)
leftYVec = sinhspace(self.miny,
self.miny - self.max_dis * y_factor,
len(midYVec))[::-1]
rightYVec = sinhspace(self.maxy,
self.maxy + self.max_dis * y_factor,
len(midYVec))
self.setYVector(np.unique(
np.concatenate([leftYVec, midYVec, rightYVec])))
if self.kernelMesh2D is not None and calc_3D_stats:
# new y vec triggers new evaluation of kernelmesh
self.set2DKernelMesh(self.kernelMesh2D, yvec=None,
order=self.h_order,
integration_mat=self.h_mat,
calc_3D_stats=True)
[docs] def set2DKernelMesh(self, inmesh, yvec=None, order=0,
integration_mat=None, calc_3D_stats=True):
""" kwargs to createYVec if YVec is None """
log.info('set2DKernelMesh: {!s}, order={}'.format(inmesh, order))
self.kernelMesh2D = pg.Mesh(inmesh)
self.h_order = order
if self.h_order > 0:
# for i in range(self.h_order):
if integration_mat is None:
# case 1/2: create h refinement and calc int mat
kernelmesh, self.h_mat = cm.createH2(self.kernelMesh2D,
order=self.h_order,
integration_mat=True)
else:
# case 2/2: set integration mat from outside
kernelmesh = cm.createH2(self.kernelMesh2D,
order=self.h_order,
integration_mat=False)
self.h_mat = integration_mat
else:
kernelmesh = pg.Mesh(self.kernelMesh2D)
self.interpolationMatrix = None
# for interpolation purposes of magnetic fields
self.slice_positions = kernelmesh.cellCenters()
# y-vector: 2D -> 3D
kernelmesh.setDimension(3)
kernelmesh.rotate(pg.RVector3(np.pi/2., 0, 0)) # x, y --> x, z
# validation test for rotation
# kernelmesh.exportVTK('Kernel2D_XZSlice.vtk')
if yvec is not None:
self.setYVector(yvec)
elif self.yvec is None:
self.createYVec()
else:
# midpoints of the virtual cells (virtual because there is no actual
# 3D Mesh)
self._evalKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats)
def _checkInterpolation(self, slices=False):
"""Internal function to evalate if interpolation can be done.
To prevent cpp error messages after very long calculation time.
"""
if self.dimension not in (1, 2):
# shortcut
return
if slices:
if self.dimension == 1:
# case 1.1: slice interpolation for 1D
if self.kernelMesh1D is None:
raise Exception(
'Check Interpolation: No kernel mesh found.')
if self.zvec is None:
raise Exception(
'Check Interpolation: No z vector found.')
kern_minx = self.kernelMesh1D.xmin()
kern_maxx = self.kernelMesh1D.xmax()
kern_miny = self.kernelMesh1D.ymin()
kern_maxy = self.kernelMesh1D.ymax()
kern_minz = np.min(self.getZVector(reduced=False))
kern_maxz = np.max(self.getZVector(reduced=False))
elif self.dimension == 2:
# case 1.2: slice interpolation for 2D
if self.kernelMesh2D is None:
raise Exception(
'Check Interpolation: No kernel mesh found.')
if self.yvec is None:
raise Exception(
'Check Interpolation: No y vector found.')
kern_cc = np.array(self.kernelMesh2D.cellCenters()).T
kern_minx = np.min(kern_cc[0])
kern_maxx = np.max(kern_cc[0])
mid_y = self.yvec[1:] - self.yvec[:-1]
kern_miny = np.min(mid_y)
kern_maxy = np.max(mid_y)
kern_minz = np.min(kern_cc[2]) # self.kernelMesh2D.zmin()
kern_maxz = np.max(kern_cc[2]) # self.kernelMesh2D.zmax()
else:
if self.dimension == 1:
# case 2.1: direct interpolation for 1D
if self.kernelMeshCellCenter is None:
if self.kernelMesh1D is not None:
self._evalKernelMesh()
else:
raise Exception(
'Check Interpolation: No kernel mesh found.')
elif self.dimension == 2:
# case 2.2: direct interpolation for 2D
if self.kernelMeshCellCenter is None:
if self.kernelMesh2D is not None:
self._evalKernelMesh()
else:
raise Exception(
'Check Interpolation: No kernel mesh found.')
# case 2: direct interpolation for 1D/2D
coords_k = self.kernelMeshCellCenter
kern_minx = np.min(coords_k[0])
kern_maxx = np.max(coords_k[0])
kern_miny = np.min(coords_k[1])
kern_maxy = np.max(coords_k[1])
kern_minz = np.min(coords_k[2])
kern_maxz = np.max(coords_k[2])
# all cases: loop discretization
coords_l = None
if self.seploopmesh is not None:
coords_l = np.array(self.seploopmesh.positions()).T
elif self.rx is not None and self.rx is not None:
if self.tx.loopmesh is not None:
coords_l = np.array(self.tx.loopmesh.positions()).T
if self.rx.loopmesh is not None:
coords_l = \
np.concatenate((np.array(
self.tx.loopmesh.positions()),
np.array(
self.tx.loopmesh.positions())),
axis=0).T
else:
pass
if coords_l is None:
raise Exception('Check Interpolation: No src mesh found.')
k1 = np.array([-kern_minx, kern_maxx,
-kern_miny, kern_maxy,
-kern_minz, kern_maxz])
l1 = np.array([-np.min(coords_l[0]), np.max(coords_l[0]),
-np.min(coords_l[1]), np.max(coords_l[1]),
-np.min(coords_l[2]), np.max(coords_l[2])])
out = k1 > l1
raiseit = False
if np.any(out):
print(np.allclose(k1[out], l1[out]))
if not np.allclose(k1[out], l1[out]):
raiseit = True
if raiseit:
raise Exception('Kernelmesh is greater than loopmesh, '
'interpolation of Bfield will fail. '
'Change one of the meshes')
[docs] def calcInterpolationMatrix(self): # , num_cpu=8):
self._checkInterpolation()
# self.interpolationMatrix = \
# calcInterpolationMatrix_para(self.tx.loopmesh,
# self.kernelMeshCellCenter.T,
# num_cpu=num_cpu)
# unparallelized
self.interpolationMatrix = \
self.tx.calculateInterpolationMatrix(
self.kernelMeshCellCenter.T)
[docs] def BFieldCalculation(self, loop_mesh=None, dipole_mesh=None,
interpolate=False,
just_loop_fields=False, recalc_loop_fields=False,
recalc_primary=False,
num_cpu=12, **kwargs):
"""
Calculates the Bfield for the kernel function for tx and rx.
internal call of `loop.calculate()` including decision if cell based or
node based Bfield is needed.
All optional parameters are piped to the `loop.calculate()` call.
Based on the desired dimension of the kernel a specialised mesh may be
automatically generated for the calculation.
Part 1/3 of the kernel calculation. Called automatically if
`kernel.calculate` is called.
"""
self._checkSrc()
if recalc_primary and self.dimension == 2:
if self.tx.secondary is None:
raise Exception('No secondary field found on tx: {!r}'
.format(self.tx))
if self.rx is not None:
if self.rx.secondary is None:
raise Exception('No secondary field found on rx: {!r}'
.format(self.rx))
if isinstance(loop_mesh, str):
loop_mesh = pg.Mesh(loop_mesh)
# no loomesh given, search in tx (and rx)
if loop_mesh is None:
loop_mesh = self.tx.loopmesh # ok take tx mesh
log.debug(f'BFieldCalculation: found loopmesh: '
f'{self.tx.loop_mesh_name}')
if loop_mesh is not None: # if found
if not self.coincident: # check for rx too
same_mesh = False
if self.rx.loopmesh is self.tx.loopmesh:
same_mesh = True
else:
if self.rx.loopmesh.nodeCount() ==\
self.tx.loopmesh.nodeCount():
if np.allclose(
self.rx.loopmesh.positions().array(),
self.tx.loopmesh.positions().array()):
same_mesh = True
log.debug(
f'BFieldCalculation: compare to rx mesh: {same_mesh}')
if not same_mesh: # same ?
loop_mesh = None # no = new mesh will be generated
log.debug('BFieldCalculation: loop_mesh: {}'.format(loop_mesh))
cell_center = self.dimension in (0, 3)
str_ = 'cell centers.' if cell_center else 'nodes.'
# override transmitter loop config with larmor frequency of M field.
self.tx.config.f = self.larmor
# case 1: seperated loop
if not self.coincident:
self.rx.config.f = self.larmor
# case 1.1: calc new
if loop_mesh is None or (dipole_mesh is None and interpolate):
if self.seploopmesh is None:
self.createSeperatedLoopMesh(dipole=interpolate)
else: # case 1.2: use given meshs (for example for roll along)
self.seploopmesh = loop_mesh
if interpolate:
self.seploopmesh_dipole = dipole_mesh
if self.seploopmesh is not self.tx.loopmesh:
log.debug(f'BFieldCalculation: set new mesh to tx: '
f'{self.slm_name}')
self.tx.setLoopMesh(self.seploopmesh, savename=self.slm_name)
if self.seploopmesh is not self.rx.loopmesh:
log.debug(f'BFieldCalculation: set new mesh to rx: '
f'{self.slm_name}')
self.rx.setLoopMesh(self.seploopmesh, savename=self.slm_name)
if self.seploopmesh_dipole is not None:
if self.seploopmesh_dipole is not self.tx.dipolemesh:
log.debug(f'BFieldCalculation: set newdipole mesh to tx: '
f'{self.slm_dipole_name}')
self.tx.setDipoleMesh(self.seploopmesh_dipole,
savename=self.slm_dipole_name)
if self.seploopmesh_dipole is not self.rx.dipolemesh:
log.debug(f'BFieldCalculation: set new dipole mesh to rx: '
f'{self.slm_dipole_name}')
self.rx.setDipoleMesh(self.seploopmesh_dipole,
savename=self.slm_dipole_name)
if self.tx.field is None or recalc_loop_fields:
log.info('Calculating magnetic field of tx for {}D-kernel on '
'{} ...'.format(self.dimension, str_))
# shared loops doesnt need to be calced again
self.tx.calculate(loop_mesh=self.seploopmesh,
dipole_mesh=self.seploopmesh_dipole,
interpolate=interpolate,
cell_center=cell_center,
num_cpu=num_cpu,
**kwargs)
if self.rx.field is None or recalc_loop_fields:
log.info('Calculating magnetic field of rx for {}D-kernel on '
'{} ...'.format(self.dimension, str_))
self.rx.calculate(loop_mesh=self.seploopmesh,
dipole_mesh=self.seploopmesh_dipole,
interpolate=interpolate,
cell_center=cell_center,
num_cpu=num_cpu,
**kwargs)
# case 2: coincident
else:
if loop_mesh is not None:
if loop_mesh is not self.tx.loopmesh:
log.debug(f'BFieldCalculation: set new mesh to tx: '
f'{self.slm_name}')
self.tx.setLoopMesh(loop_mesh, savename=self.slm_name)
if dipole_mesh is not None:
if dipole_mesh is not self.tx.dipolemesh:
log.debug(f'BFieldCalculation: set new dipole mesh to '
f'tx: {self.slm_dipole_name}')
self.tx.setDipoleMesh(dipole_mesh,
savename=self.slm_dipole_name)
if self.tx.field is None or recalc_loop_fields:
log.info('Calculating magnetic field of tx for {}D-kernel on '
'{} ...'.format(self.dimension, str_))
self.tx.calculate(loop_mesh=loop_mesh, dipole_mesh=dipole_mesh,
interpolate=interpolate,
cell_center=cell_center,
num_cpu=num_cpu,
**kwargs)
# tx.field -> self.txField (interpolation or directly)
if just_loop_fields is False:
self.interpolateBFieldToKernel(recalc_primary=recalc_primary,
num_cpu=num_cpu)
[docs] def interpolateBFieldToKernel(self, recalc_prim_on_kernel=False,
recalc_primary=False,
num_cpu=32,
calc_3D_stats=True):
''' Takes the rx Bfield and interpolates it to the kernel mesh. '''
if recalc_prim_on_kernel:
raise Exception('to be integrated')
if self.dimension in (1, 2):
if self.kernelMeshCellCenter is None:
if self.dimension == 1:
if self.kernelMesh1D is None:
log.log(16, 'Creating new kernel mesh.')
self.create1DKernelMesh(calc_3D_stats=calc_3D_stats)
else:
if calc_3D_stats:
self._evalKernelMesh(self.kernelMesh1D)
elif self.dimension == 2:
if self.kernelMesh2D is None:
log.log(16, 'Creating new kernel mesh.')
self.create2DKernelMesh(calc_3D_stats=calc_3D_stats)
else:
if calc_3D_stats:
self._evalKernelMesh(self.kernelMesh2D)
ticklI = time.time()
log.log(16, 'Start interpolation of Bfield for '
'{}D-kernel calculation ({} points)...'
.format(self.dimension, len(self.kernelMeshCellVolume)))
if self.interpolationMatrix is None:
self.calcInterpolationMatrix() # num_cpu=num_cpu)
if recalc_primary:
tx_mesh = self.tx.loopmesh
tx_mesh_name = self.tx.loop_mesh_name
tx_total = np.copy(self.tx.field)
self.txBfield = cm.interpolateField_Matrix(
self.tx.secondary,
self.interpolationMatrix)
log.log(13, 'calculating primary field on kernel mesh '
'directly.')
self.tx.setLoopMesh(self.kernelMeshCellCenter.T)
self.tx.config.ftype = 'B'
self.tx.calculate(num_cpu=num_cpu)
self.tx_primary = self.tx.field
self.txBfield += self.tx_primary
self.tx.field = np.copy(tx_total) # reset old tx field
self.tx.setLoopMesh(tx_mesh, savename=tx_mesh_name)
if not self.coincident:
rx_mesh = self.rx.loopmesh
rx_mesh_name = self.rx.loop_mesh_name
rx_total = np.copy(self.rx.field)
self.rxBfield = cm.interpolateField_Matrix(
self.rx.secondary,
self.interpolationMatrix)
log.log(13, 'calculating primary field on '
'kernel mesh directly.')
self.rx.setLoopMesh(self.kernelMeshCellCenter.T)
self.rx.config.ftype = 'B'
self.rx.calculate(num_cpu=num_cpu)
self.rx_primary = np.copy(self.rx.field)
self.rxBfield += self.rx_primary
self.rx.field = rx_total
self.rx.setLoopMesh(rx_mesh, savename=rx_mesh_name)
else:
mat_rows = self.interpolationMatrix.rows()
mat_cols = self.interpolationMatrix.cols()
if mat_cols != self.tx.field.shape[1]:
raise Exception(
'interpolateBFieldToKernel(): Dimension mismatch '
'between interpolation matrix ({} x {}) and tx field '
'shape ({})'
.format(mat_cols, mat_rows, self.tx.field.shape))
if mat_rows != len(self.kernelMeshCellVolume):
raise Exception(
'interpolateBFieldToKernel(): Dimension mismatch '
'between interpolation matrix ({} x {}) and kernel '
'mesh cell count in 3D ({})'
.format(mat_cols, mat_rows,
len(self.kernelMeshCellVolume)))
self.txBfield = cm.interpolateField_Matrix(
self.tx.field,
self.interpolationMatrix)
if not self.coincident:
if mat_cols != self.rx.field.shape[1]:
raise Exception(
'interpolateBFieldToKernel(): Dimension mismatch '
'between interpolation matrix ({} x {}) and rx '
'field shape ({})'
.format(mat_cols, mat_rows, self.rx.field.shape))
self.rxBfield = cm.interpolateField_Matrix(
self.rx.field,
self.interpolationMatrix)
log.log(16, 'B field interpolation in {:.2f} sec'
.format(time.time() - ticklI))
else:
self.txBfield = self.tx.field
if not self.coincident:
self.rxBfield = self.rx.field
[docs] def ellipticalDecomposition(self):
"""
Computes the counter and corotating parts of the given magnetic fields
with respect to a given earth magnetic field.
Parameters
----------
Bfield: complex field [3, n] or string
Optional. Possibility to insert a pre calculated field.
Inclination: float
Inclination of the earth magnetic field at the loop site in rad
[0... 2pi]
Declination: float
Declination of the magnetic field at the loop site in rad
[0... 2pi]
B: np.array of shape (3, n)
Magnetic field of the loop
Second part of the kernel calculation.
- mainly from Weichman et al. (2000)
"""
if self.txBfield is None:
log.info(
'Elliptical decomposition triggered new B-field calculation.')
self.BFieldCalculation()
self.txalpha, self.txbeta, self.txzeta, self.txperpend = \
self.ellipticalDecomposition_multi(self.txBfield,
self.survey.earth)
if not self.coincident:
self.rxalpha, self.rxbeta, self.rxzeta, self.rxperpend = \
self.ellipticalDecomposition_multi(self.rxBfield,
self.survey.earth)
return (self.txalpha, self.txbeta, self.txzeta),\
(self.rxalpha, self.rxbeta, self.rxzeta)
[docs] @staticmethod
def ellipticalDecomposition_multi(Bfield, earth):
"""
Computes the counter and corotating parts of the given magnetic fields
with respect to a given earth magnetic field.
Parameters
----------
Bfield: complex field [3, n] or string
Optional. Possibility to insert a pre calculated field.
Inclination: float
Inclination of the earth magnetic field at the loop site in rad
[0... 2pi]
Declination: float
Declination of the magnetic field at the loop site in rad
[0... 2pi]
B: np.array of shape (3, n)
Magnetic field of the loop
Second part of the kernel calculation.
Literature
----------
- Weichman et al. (2000)
- Hertrich (2005, Appendix)
- Hertrich (2008, eq. 6 ff.)
"""
tickeD = time.time()
# Nov 2020: Finally figuring out where the conjugated kernel origins
# this is the final fix to ensure the phase of the kernel is finally
# consistent with pulications of Hertrich, Braun and Jiang.
# Explaination: The negative values of the larmor frequency
# (representing the orientation of the spins) manifests as a 180 degree
# phase in the magneitc fields for both transmitter and receiver.
# This has not been implemented in the magnetic field calcualtion,
# which means it is now condidered right here.
log.debug('Using conjugate complex of input field due to larmor sign.')
bfield = np.conjugate(Bfield)
# earth magnetic field: B0
# z negativ downwards (by convention) !
# Norm! (b0 in Hertrich 2008)
B0 = earth.field
# Bperp: perpendicular projection of B = Bfield vertical on B0
# vertical components from B pertaining to B0
# Hertrich, 2008 (eq. 4)
scalarproduct = np.sum(B0 * bfield, 0)
Bperp = bfield - scalarproduct * B0
crossproduct = np.cross(Bperp, np.conj(Bperp), axis=0)
B0_cross = B0 * crossproduct
tmp = 1j * np.sum(B0_cross, 0)
# 09.02.2019 switched sign from - to +
signum = np.sign(tmp).real
# 11.03.2020 supervised by T. Hiller, same as above!
# signum = np.zeros_like(tmp, dtype=float)
# signum[np.logical_or(tmp.real > 0,
# np.logical_and(np.isclose(tmp.real, 0),
# tmp.imag > 0))] = 1
# signum[np.logical_or(tmp.real < 0,
# np.logical_and(np.isclose(tmp.real, 0),
# tmp.imag < 0))] = -1
# alpha, beta, zeta
# always real
B_v_conj_B_v = np.sum(Bperp * np.conj(Bperp), 0).real
B_vert_B_vert = np.sum(Bperp * Bperp, 0)
# calc alpha and beta
# weichman et al. 2000: eq 4.5
alpha = np.sqrt(0.5 * (B_v_conj_B_v + np.abs(B_vert_B_vert)))
beta_hlp = B_v_conj_B_v - np.abs(B_vert_B_vert)
log.debug('negative beta inbound')
log.debug('B_v_conj_B_v, {}'.format(B_v_conj_B_v[beta_hlp < 0]))
log.debug('np.abs(B_vert_B_vert), {}'.format(
np.abs(B_vert_B_vert)[beta_hlp < 0]))
log.debug(beta_hlp[beta_hlp < 0])
beta = np.real(np.lib.scimath.sqrt(beta_hlp * 0.5) * signum)
# alpha and beta are both real,
# however the sqrt in beta can recieve neg values, in which case the
# result gets imaginary components. They should be near zero and can be
# ommitted. Also zeta in the same time should be near (1 + 0j), further
# reducing the weight of imaginary parts in beta
# calc exp(j*zeta_trans (== phase))
# weichman et al. 2000: eq 4.4
zeta = np.sqrt(B_vert_B_vert / np.abs(B_vert_B_vert))
# calc norm (was called perpendicular before 06.02.19)
# weichman et al. 2000: eq 4.6
# unit vector
b_norm = np.real(Bperp / zeta) / alpha
# log.debug('bnorm')
# bnorm_mag = np.sqrt(np.sum(b_norm * np.conj(b_norm), axis=0))
# log.debug((np.min(bnorm_mag), np.max(bnorm_mag)))
log.debug('elliptical decomposition: {:.2f} sec'
.format(time.time() - tickeD))
return alpha, beta, zeta, b_norm
[docs] def kernelIntegration(self, calc_theta=False):
"""
Computes the integration of the kernel with respect to the desired
dimension.
Parameters
----------
decomposition: (alpha, beta, zeta)
Bfield_part essentially consists of the output from the
elliptical decomposition of the magnetic field.
measurement: class
An instance of a measurement class has to be given in order to keep
the number of input arguments manageable.
earthmagnitude: float
Magnitude of the earth magnetic field [Tesla]. Aproximatly about
30000 to 65000 nT (1 nT = 1e-9 Tesla).
Third part of the kernel calculation.
"""
self._checkPulses()
tickkI = time.time()
if self.txalpha is None:
# alpha, beta, zeta and rest of the formulations
# from Weichman et al. (2000)
# from Hertrich (2008, eq. 6-10)
self.ellipticalDecomposition()
if self.coincident:
self.rxalpha = None
self.rxbeta = None
self.rxzeta = None
self.rxperpend = None
# note: this function needs more memory than cpu
ret = self.kernelCalculation_multi(
self.fid, self.survey.earth, self.txalpha, self.txbeta,
self.txzeta, self.txperpend, rxalpha=self.rxalpha,
rxbeta=self.rxbeta, rxzeta=self.rxzeta, rxperpend=self.rxperpend,
calc_theta=calc_theta)
if calc_theta:
K1, K2, K3, self.theta = ret
else:
K1, K2, K3 = ret
self.K = K1 * K2 * K3
np.save('KernMat_dim{}.npy'.format(self.dimension),
self.K)
if self.debug:
if self.dimension != 2:
pass
else:
self.K_part1 = K1
self.K_part2 = K2
self.K_part3 = K3
kp12D = np.sum(np.array(
K1 * self.kernelMeshCellVolume).reshape(
len(self.pulses),
len(self.yvec) - 1, -1), 1)
kp22D = np.sum(np.array(
K2 * self.kernelMeshCellVolume).reshape(
1, len(self.yvec) - 1, -1), 1)
kp32D = np.sum(np.array(
K3 * self.kernelMeshCellVolume).reshape(
1, len(self.yvec) - 1, -1), 1)
if self.h_order >= 1:
kp12D = integrateKernelH2(self.h_mat,
kp12D)
kp22D = integrateKernelH2(self.h_mat,
kp22D)
kp32D = integrateKernelH2(self.h_mat,
kp32D)
self.K_part1 = kp12D /\
np.array(self.kernelMesh2D.cellSizes())
self.K_part2 = kp22D /\
np.array(self.kernelMesh2D.cellSizes())
self.K_part3 = kp32D /\
np.array(self.kernelMesh2D.cellSizes())
nk1 = 'kern_part1_3D.npy'
np.save(nk1, self.K_part1)
print('Kernel: DEBUG: save {}'.format(nk1))
nk2 = 'kern_part2_3D.npy'
np.save(nk2, self.K_part2)
print('Kernel: DEBUG: save {}'.format(nk2))
nk3 = 'kern_part3_3D.npy'
np.save(nk3, self.K_part3)
print('Kernel: DEBUG: save {}'.format(nk3))
nk4 = 'kern_full_3D.npy'
if self.h_order >= 1:
kern3D = np.ones((len(self.pulses), len(self.yvec) - 1,
self.h_mat.rows()),
dtype=np.complex)
raw = self.K.reshape(
len(self.pulses), len(self.yvec) - 1, -1)
for ip, pulse in enumerate(raw): # pulses
for ik, kslice in enumerate(pulse):
kern3D[ip, ik] = integrateKernelH2(self.h_mat,
kslice)
kern3D = kern3D.reshape(len(self.pulses), -1)
else:
kern3D = self.K
np.save(nk4, kern3D)
print('Kernel: DEBUG: save {}'.format(nk4))
# Integration with respect to the desired dimension
if self.dimension == 0:
# direct sum over the volume + weighting
self.K = np.sum(
self.K * np.array(self.tx.loopmesh.cellSizes()), 1)
if self.dimension == 1: # 1D Kernel over a given z-vector
# integration over x-y-plane to z <- 1D
# Dimension before sum: (len(pm_vec), Z, X*Z), X*Z = dim 2
self.K = np.sum(np.array(
self.K * self.kernelMeshCellVolume).reshape(
len(self.pulses), len(self._zvec) - 1, -1), 2)
if self.dimension == 2: # 2D Kernel over a given y-vector
# integration over y to x-z-plane <- 2D
# Dimension before sum: (len(pm_vec), Y, X*Z), y = dim 1
self.K = np.sum(np.array(
self.K * self.kernelMeshCellVolume).reshape(
len(self.pulses), len(self.yvec) - 1, -1), 1)
# self.K = K_full
if self.h_order > 0:
self.K = integrateKernelH2(self.h_mat, self.K)
# Attention:
self.K *= 4**self.h_order
# nico: due to the weighting with self.kernelMeshCellVolume
# of the refined mesh, the weigthting functions are too small.
# The actual cell sizes of the unrefined 2D mesh are bigger
# by a factor of 4**order.
# Or in other words: you cannot integrate twice with weighting.
# The second integration doesnt need another weighting.
if self.dimension == 3:
# just adding weight (Volume), no integration
self.K *= np.array(self.tx.loopmesh.cellSizes())
log.info('kernel integration {:.2f} sec'.format(time.time() - tickkI))
[docs] @staticmethod
def kernelCalculation_multi(
fid, earth, txalpha, txbeta, txzeta, txperpend,
rxalpha=None, rxbeta=None, rxzeta=None, rxperpend=None,
calc_theta=False):
"""
"""
def scalar(vec1, vec2):
"""
Scalar product of two (3, n)-vectors -> (n,)-vector.
"""
return np.sum(vec1 * vec2, 0)
def cross(vec1, vec2):
"""
Cross product of two (3, n)-vectors -> (3, n)-vector.
"""
return np.cross(vec1, vec2, axis=0)
# common stuff
pulsevector = fid.pulses[:, np.newaxis] * \
fid.tx_turns
gam = misc.constants.gamma
B0 = earth.field
# magnetization
if np.allclose(fid.df, 0):
log.debug('Kernel calculation without frequency offset')
# magnetization: no frequeny offset
theta = 0.5 * gam * pulsevector * (txalpha - txbeta)
magnetization = np.sin(theta)
# different coordinate system
# rotated reference frame
# theta = flipwinkel = 0.5 * gam * pulsevector *
# (txalpha - txbeta)
# task: drehe B0 vector um txperp um den winkel theta
# das is dann der winkel in den die magnetisierung zeigt
# (im carthesischen coord system, nicht mehr für kernel z
# gebrauchen)
else:
log.debug('Kernel calculation with df: {}'.format(
np.array(fid.df)))
# magnetization with frequency offset
theta = np.arctan2(
0.5 * gam * pulsevector / fid.taup1 *
(txalpha - txbeta),
(2 * np.pi * fid.df)[:, np.newaxis])
flip_eff = np.sqrt(
(0.5 * gam * pulsevector *
(txalpha - txbeta))**2 +
(2 * np.pi * fid.df[:, np.newaxis] *
fid.taup1)**2)
magnetization = np.sin(flip_eff) * np.sin(theta)\
- 1j * np.sin(theta) * np.cos(theta) * (np.cos(flip_eff) - 1)
# transmitter part
# changed tx_turns -> rx_turns (12.03.19) Aaron said yes
K_part1 =\
txzeta * magnetization * fid.curie * gam * \
(earth.magnitude**2) * fid.rx_turns
# receiver part + geometry
if rxalpha is None:
# coincident, normally rx, in this case rx=tx
# not txzeta**2, txzeta is also in k_part1
K_part2 = txzeta * (txalpha + txbeta)
# K_part3: coincident: geometric factor = 1
K_part3 = np.ones_like(K_part2)
else:
# separated case
# not rxzeta * txzeta, txzeta is in k_part1
K_part2 = rxzeta * (rxalpha + rxbeta)
# K_part3: geometric factor
K_part3 = scalar(rxperpend, txperpend) + \
1j * scalar(B0, cross(rxperpend,
txperpend))
ret = (K_part1, K_part2, K_part3,)
if calc_theta:
ret += (theta,)
return ret
[docs] def sliceKernel2D(self, savename=None, forceNew=False, loopSaveName=None,
num_cpu=None, new_bfield=False,
loop_mesh=None, slice_name=None, **kwargs):
""" 2D Kernel in a memory saving parallel computation approach. """
import multiprocessing as mpc
import tempfile as temp
# see if tx and rx is available
self._checkSrc()
if self.kernelMesh2D is None:
self.create2DKernelMesh(calc_3D_stats=False)
tick_sI = time.time()
if num_cpu is None:
num_cpu = self.num_cpu
N = len(self.yvec) - 1
number_of_processes = min(min(mpc.cpu_count(), N),
num_cpu)
if slice_name is None:
# Preparation of interpoaltion mesh and loops
self.BFieldCalculation(just_loop_fields=True,
loop_mesh=loop_mesh,
recalc_loop_fields=new_bfield,
num_cpu=num_cpu)
if self.coincident:
rxfield = None
mesh = self.tx.loopmesh
else:
rxfield = self.rx.field
mesh = self.seploopmesh
if os.path.exists(self.tx.loop_mesh_name):
temporary_mesh = False
meshName = self.tx.loop_mesh_name
log.debug('loop mesh for multiprocessing: {}'.format(meshName))
else:
# create temporary filename
temporary_mesh = True
tempFile = temp.NamedTemporaryFile(suffix='.bms', delete=False)
meshName = tempFile.name
tempFile.close() # pygimli needs write access, hence delete=False
log.debug('save mesh for multiprocessing: {}'.format(meshName))
log.debug('loopmeshname: {}'.format(self.tx.loop_mesh_name))
log.debug('exists: {}'.format(
os.path.exists(self.tx.loop_mesh_name)))
mesh.save(meshName)
self._checkInterpolation(slices=True)
else:
# Check if slices are available
forcheck = Path(slice_name.format(self.fid.tx_index, 0))
if not forcheck.exists():
raise Exception('No slice found under :"{}"'
.format(forcheck))
# no source mesh or fields are needed
meshName = None
rxfield = None
temporary_mesh = False
self.kernelMeshCellVolume = None
self.kernelMeshCellCenter = None
slice_queue = mpc.JoinableQueue()
# output for worker, input for summation
sum_slice_queue = mpc.Queue()
# output for summation
stop_queue = mpc.JoinableQueue()
# end signal for summation
parent_id = os.getpid()
log.info('Get {} slices... start interpolating in {} parallel '
'processes.'.format(N, number_of_processes))
processes = []
if self.h_order >= 0:
kernelMesh2D_rotated = cm.createH2(self.kernelMesh2D,
order=self.h_order,
integration_mat=False)
else:
kernelMesh2D_rotated = pg.Mesh(self.kernelMesh2D)
kernelMesh2D_rotated.setDimension(3)
kernelMesh2D_rotated.rotate(pg.RVector3(np.pi/2., 0, 0))
out_pos_2d = np.array(kernelMesh2D_rotated.cellCenters())
indices = np.array_split(range(N), number_of_processes)
# begin different distribution, to ensure equal workload per processor
# saves about 3-4 % calculation time - yay...
log.debug('test different array distribution')
slices_per_core = len(indices[0])
indices2 = np.zeros(N, dtype=int)
complete = 0
for i in range(slices_per_core):
temp = indices2[i::slices_per_core]
indices2[i::slices_per_core] =\
np.arange(complete, complete + len(temp), dtype=int)
complete += len(temp)
assert np.allclose(np.sort(indices2), np.array(range(N)))
# some checks just prove to be worth the time
indices = np.array_split(indices2, number_of_processes)
# print(indices)
# end different distribution
for i in range(number_of_processes):
log.debug('index: {}'.format(i))
worker = mpc.Process(target=_2DSliceKernelWorker,
args=(meshName,
out_pos_2d,
indices[i],
self.yvec,
self.tx.field,
rxfield,
slice_queue,
parent_id,
i, # worker ID
self.survey.earth,
self.fid,
slice_name,
))
log.debug('created worker')
worker.daemon = True
processes.append(worker)
worker.start()
log.debug('started worker: {}'.format(worker.is_alive()))
for p in processes:
log.debug('still alive: {}'.format(p.is_alive()))
# lengths of the cells = weighting (in y direction)
sum_worker = mpc.Process(
target=_2DSliceSummationWorker,
args=(slice_queue,
sum_slice_queue,
stop_queue,
(len(self.pulses),
self.kernelMesh2D.cellCount()*(4**self.h_order)),
len(self.yvec) - 1,
number_of_processes,
parent_id))
sum_worker.daemon = True
sum_worker.start()
for p in processes:
log.debug('still alive: {}'.format(p.is_alive()))
log.debug('started additional worker for summation: {}'
.format(sum_worker.is_alive()))
isInterrupt = False
log.debug('sum worker still alive: {}'.format(sum_worker.is_alive()))
while True:
try:
if stop_queue.empty():
if not sum_worker.is_alive():
isInterrupt = True
if isInterrupt:
log.error(
'Sum Worker died, shutting '
'down the rest as well and abort run.')
sum_worker.terminate()
break
time.sleep(0.5)
else:
# waits till all slices are summarized, queue empty
stop_queue.get()
stop_queue.task_done()
log.debug('join slice queue')
slice_queue.join()
log.debug('finished, end of multiprocessing, cleaning up')
break
except KeyboardInterrupt:
isInterrupt = True
log.critical(
'detect Keyboard Interrupt. Shutting processes down.')
sum_worker.terminate()
break
for p in processes:
p.terminate() # tells the process to terminate
time.sleep(0.1)
if p.is_alive():
p.join() # waits till termination of p
if temporary_mesh:
os.unlink(meshName) # release temporary mesh file
self.K = sum_slice_queue.get() # final calculated kernel, normalized
if self.h_order > 0:
self.K = integrateKernelH2(self.h_mat, self.K)
log.debug('final Kern: {}'.format(self.K.shape))
sum_worker.terminate() # tells sumWorker to terminate
if sum_worker.is_alive():
log.debug('sum_worker not terminated, join()')
sum_worker.join() # waits till termination of p
# weighting over cell sizes of the actual 2D Mesh (y vec already done)
self.K *= np.array(self.kernelMesh2D.cellSizes())
log.info('overall interpolation time: %.4f seconds' %
(time.time() - tick_sI))
if temporary_mesh:
log.debug('temporary file deleted?: {} (supposed to be True)'
.format(not os.path.exists(meshName)))
return self.K
[docs] def save(self, savename=None, save_interpolation_mat=False,
save_loopmesh=False, light=True,
kernelmesh_name=None):
""" Save the basic information to restore the Kernel class later. """
if savename is None:
savename = Path(f'_default_Kernel_{self.dimension}D')
else:
savename = Path(savename).with_suffix('')
savename.parent.mkdir(exist_ok=True, parents=True)
log.info('Save Kernel: {}'.format(savename))
# flip some switches
if light is True:
# no matrices or meshs and no b fields, just basic NMR stuff
# no recalculation possible after load !
save_interpolation_mat = False
save_loopmesh = False
rx_bfield = None
tx_bfield = None
else:
rx_bfield = self.rxBfield
tx_bfield = self.txBfield
# separated loop mesh
if save_loopmesh and self.seploopmesh is not None:
# meshsave managed by kernel class, the loops dont know this
if self.slm_name is None:
self.slm_name = '{}_SepLoopMesh.bms'.format(savename.stem)
log.info(f'save separated loop mesh: {self.slm_name}')
self.seploopmesh.save(savename.with_name(self.slm_name).as_posix())
# kernelmesh 1D
if self.kernelMesh1D is not None:
if kernelmesh_name is not None:
kernelmesh1d = kernelmesh_name
else:
kernelmesh1d = '{}_kern1D.bms'.format(savename.stem)
log.info(f'save kernelmesh1D: {kernelmesh1d}')
self.kernelMesh1D.save(savename.with_name(kernelmesh1d).as_posix())
else:
kernelmesh1d = None
# kernelmesh 2D
if self.kernelMesh2D is not None:
if kernelmesh_name is not None:
kernelmesh2d = Path(
kernelmesh_name).with_suffix('.bms').as_posix()
else:
kernelmesh2d = '{}_kern2D.bms'.format(savename.stem)
log.info(f'save kernelmesh2D: {kernelmesh2d}')
self.kernelMesh2D.save(savename.with_name(kernelmesh2d).as_posix())
else:
kernelmesh2d = None
# save interpolation matrix
if save_interpolation_mat and self.interpolationMatrix is not None:
iM = True # True is enough, name is fix
iMName = '{}_interpolationMatrix.imat'.format(savename.stem)
log.info(f'save interpolation matrix: {iMName}')
self.interpolationMatrix.save(
savename.with_name(iMName).as_posix())
else:
iM = False
if self.dimension in (0, 3):
# basic
np.savez(savename.with_suffix('.npz').as_posix(),
kernel=self.K,
dimension=self.dimension,
fid_index=self.fid_index,
sepm_name=self.slm_name,
txBfield=tx_bfield,
rxBfield=rx_bfield,
pulses=self.pulses,)
if self.dimension == 3:
self.exportVTK(f'{savename.stem}_3DMesh.vtk')
elif self.dimension == 1:
# + zvec, min_thk, kernelmesh1d, interpolation matrix
np.savez(savename.with_suffix('.npz').as_posix(),
kernel=self.K,
dimension=self.dimension,
zvec=self._zvec,
fid_index=self.fid_index,
min_thk=np.array((self.min_thk),),
sepm_name=self.slm_name,
kernelmesh1d=kernelmesh1d,
txBfield=tx_bfield,
rxBfield=rx_bfield,
interpolationMatrix=iM,
pulses=self.pulses,)
elif self.dimension == 2:
# + yvec, order, interpolation matrix
np.savez(savename.with_suffix('.npz').as_posix(),
kernel=self.K,
dimension=self.dimension,
fid_index=self.fid_index,
sepm_name=self.slm_name,
kernelmesh2d=kernelmesh2d,
yvec=self.yvec,
order=self.h_order,
txBfield=tx_bfield,
rxBfield=rx_bfield,
interpolationMatrix=iM,
pulses=self.pulses,
kern_part1=self.K_part1,
kern_part2=self.K_part2,
kern_part3=self.K_part3,)
else:
raise Exception(f'Dim = {self.dimension} ?')
[docs] def load(self, savename, load_loopmesh=True, kernelmesh2d=None,
load_kernelmesh=True, use_order_refinement=True):
""" Load a previously saved kernel (.npz-format). """
savename = Path(savename).with_suffix('')
# found file ?
if not checkForKernel(savename.with_suffix('.npz').as_posix()):
raise FileNotFoundError(
'cannot found kernel at given path: "{}"'
.format(os.path.abspath(savename.with_suffix('.npz'))))
log.info('load file: {}'.format(
savename.with_suffix('.npz').absolute()))
# open main container
with np.load(savename.with_suffix('.npz').as_posix(),
allow_pickle=True) as content:
self.K = getItem(content, 'kernel')
self.fid_index = getItem(content, 'fid_index')
self.txBfield = getItem(content, 'txBfield')
self.rxBfield = getItem(content, 'rxBfield')
self.dimension = getItem(content, 'dimension')
self.slm_name = getItem(content, 'sepm_name')
# temporary uses saved pulses until a survey class is provided
# (pulses are then used from the corresponding fid class)
self.setPulsesDirectly(getItem(content, 'pulses', None))
if self.dimension == 1:
min_thk = getItem(content, 'min_thk', 0.5)
zvec = getItem(content, 'zvec')
self.setZVector(zvec, min_thk=min_thk)
kernelmesh1d = getItem(content, 'kernelmesh1d', None)
if kernelmesh1d is not None:
name1 = savename.with_name(kernelmesh1d).as_posix()
if os.path.exists(name1):
self.set1DKernelMesh(name1)
else:
log.warning(f'Mesh not found: {name1}')
# mesh not found, continue
elif self.dimension == 2:
self.h_order = getItem(content, 'order', 0)
self.yvec = getItem(content, 'yvec')
if load_kernelmesh:
if kernelmesh2d is None:
# not given, search in file
kernelmesh2d = getItem(content, 'kernelmesh2d', None)
else:
kernelmesh2d = None
if kernelmesh2d is not None:
# either given or found in file
if not use_order_refinement:
self.h_order = 0
if isinstance(kernelmesh2d, pg.Mesh):
self.set2DKernelMesh(kernelmesh2d, order=self.h_order)
else:
name2 = savename.with_name(kernelmesh2d).as_posix()
if os.path.exists(name2):
self.set2DKernelMesh(name2, order=self.h_order)
else:
log.warning(f'Mesh not found: {name2}')
self.K_part1 = getItem(content, 'kern_part1', None)
self.K_part2 = getItem(content, 'kern_part2', None)
self.K_part3 = getItem(content, 'kern_part3', None)
if self.K_part1 is not None:
self.debug = True
if load_loopmesh:
if self.slm_name is not None:
name3 = savename.with_name(self.slm_name).as_posix()
if os.path.exists(name3):
self.seploopmesh = pg.Mesh(name3)
else:
log.warning(f'Mesh not found: {name3}')
if getItem(content, 'interpolationMatrix', False) is True:
self.interpolationMatrix = pg.SparseMapMatrix()
hlp = '{}_interpolationMatrix.imat'.format(savename.stem)
name4 = savename.with_name(hlp).as_posix()
try:
self.interpolationMatrix.load(name4)
except RuntimeError:
log.warning('Interpolation Matrix not found: '
f'{name4}_interpolationMatrix.imat')
[docs] def calculate(self, loop_mesh=None,
dipole_mesh=None,
interpolate=False,
savename=None,
forceNew=False,
slices=True,
slice_name=None,
**kwargs):
"""
All three parts of the kernel calulation are called here.
All given kwargs are directed to BfieldCalculation(), see function info
for details about possible keyword arguments.
called functions
----------------
>>> self.BFieldCalculation(**kwargs)
>>> self.ellipticalDecomposition()
>>> self.kernelIntegration()
>>> if savename is not None:
self.save(savename)
keyword arguments
-----------------
destinations:
none for now, with exception of "num_cpu", [12]
which is directed to BfieldCalculation and/or sliceKernel
"""
if not self.updatable:
raise Exception(
f'Cannot calculate Kernel. Check tx: {self.tx}, rx: {self.rx} '
f'or pulses: {self.pulses}')
self.num_cpu = kwargs.pop('num_cpu', self.num_cpu)
if slices is True and self.dimension == 2:
self.sliceKernel2D(slice_name=slice_name)
elif slices is True and self.dimension == 1:
self.sliceKernel1D(loop_mesh=loop_mesh,
new_bfield=forceNew,
slice_name=slice_name)
else:
self.BFieldCalculation(
loop_mesh=loop_mesh,
dipole_mesh=dipole_mesh,
interpolate=interpolate,
recalc_loop_fields=forceNew,
num_cpu=self.num_cpu,
**kwargs)
self.ellipticalDecomposition()
self.kernelIntegration(calc_theta=True)
if savename is not None:
self.save(savename)
return
[docs] def createMagnetizationMesh(self):
"""
Creates full 3D mesh for display and calcualtion of magnetization
vectors. Not needed for normal kernel calculation routine and big,
therefore separate.
"""
# 2D mesh -> 3D triangle-prism-mesh
log.info('getMagnetization: prepare 3D mesh...')
self.mag_mesh = pg.meshtools.extrudeMesh(self.kernelMesh2D, self.yvec)
# inplace function working in mesh cpp background, no return value
# x-z + y = x-z-y input mesh to x-y-z output mesh
self.mag_mesh.swapCoordinates(1, 2)
[docs] def calcMagnetization(self):
"""
Creates 3D mesh and calcualtes magnetization vector after excitation.
Returns magnetization vector of shape (num_pulses, num_cells_3d, 3)
"""
assert self.dimension == 2, 'getMagnetization: Only working for 2D'
if self.theta is None:
if self.txalpha is None:
self.ellipticalDecomposition()
pulsevector = self.fid.pulses[:, np.newaxis] * \
self.fid.tx_turns
gam = misc.constants.gamma
if np.allclose(self.fid.df, 0):
self.theta = 0.5 * gam * pulsevector *\
(self.txalpha - self.txbeta)
else:
self.theta = np.arctan2(
0.5 * gam * pulsevector / self.fid.taup1 *
(self.txalpha - self.txbeta),
(2 * np.pi * self.fid.df)[:, np.newaxis])
self.createMagnetizationMesh()
vvec = np.tile(self.survey.earth.field, (self.theta.shape[1], 1, 1))
self.magnetization = np.zeros((len(self.pulses),
self.theta.shape[1],
3))
for pi in range(len(self.pulses)):
tick = time.time()
rotmats = cm.rotationMatrix(self.txperpend, self.theta[pi, :])
m2 = np.squeeze(np.einsum('ijk,ikl->ijl', rotmats.T.swapaxes(1, 2),
vvec))
magnetization = m2 * self.magnetization_magnitude
self.magnetization[pi] = magnetization
log.log(13, 'time for one pulse: {:.4f} sec'
.format(time.time() - tick))
return self.magnetization
[docs] def exportMagnetization(self, name, vtk_export=False, pulse=0):
"""
Export a previously calculated magnetization vector as numpy
vector and optionally vtk file.
"""
# has vector?
if self.magnetization is None:
raise Exception('No magnetization vector found. Use '
'getMagnetization to calculate the vector or '
'import and set kern.magnetization manually.')
# has mesh?
if self.mag_mesh is None:
self.createMagnetizationMesh()
# export numpy matrix
np.save(Path(name).with_suffix('.npy'), self.magnetization)
# export vtk
if vtk_export:
vtk_name = Path(name).with_suffix('.vtk')
# # magnetization is always real
# xr = pg.meshtools.cellDataToNodeData(
# self.mag_mesh,
# self.magnetization[pulse, :, 0].real)
# yr = pg.meshtools.cellDataToNodeData(
# self.mag_mesh,
# self.magnetization[pulse, :, 1].real)
# zr = pg.meshtools.cellDataToNodeData(
# self.mag_mesh,
# self.magnetization[pulse, :, 2].real)
# vtk_mag = np.array((xr, yr, zr))
# theta from -pi to pi for nice display with cyclic colorbar
self.mag_mesh.addData('theta',
self.theta[pulse] % (np.pi*2) - np.pi)
self.mag_mesh.exportVTK(vtk_name.as_posix(), self.theta)
savefieldvtk(vtk_name.as_posix(),
self.mag_mesh, self.magnetization[pulse])
[docs] def sliceKernel1D(self, num_cpu=None, loop_mesh=None,
new_bfield=False, interpolate_bfield=True,
slice_name=None):
import multiprocessing as mpc
import tempfile as temp
self._checkSrc()
if self.kernelMesh1D is None:
self.create1DKernelMesh(calc_3D_stats=False)
tick_sI = time.time()
if num_cpu is None:
num_cpu = self.num_cpu
N = len(self._zvec) - 1
number_of_processes = min(min(mpc.cpu_count(), N),
num_cpu)
tx_name = None
rx_name = None
if slice_name is None:
tx_index = None
rx_index = None
if interpolate_bfield:
# b field case 1/2: b field is interpolated from loop mesh
self.BFieldCalculation(just_loop_fields=True,
loop_mesh=loop_mesh,
recalc_loop_fields=new_bfield,
num_cpu=number_of_processes)
tx_field = self.tx.field
if self.coincident:
rx_field = None
mesh = self.tx.loopmesh
else:
rx_field = self.rx.field
mesh = self.seploopmesh
tx_name = None
rx_name = None
if os.path.exists(self.tx.loop_mesh_name):
temporary_mesh = False
meshName = self.tx.loop_mesh_name
log.debug('loop mesh for multiprocessing: {}'.format(meshName))
else:
# create temporary filename
temporary_mesh = True
tempFile = temp.NamedTemporaryFile(suffix='.bms', delete=False)
meshName = tempFile.name
tempFile.close() # pygimli needs write access
log.debug('save mesh for multiprocessing: {}'.format(meshName))
log.debug('loopmeshname: {}'.format(self.tx.loop_mesh_name))
log.debug('exists: {}'.format(
os.path.exists(self.tx.loop_mesh_name)))
mesh.save(meshName)
self._checkInterpolation(slices=True)
else:
# b field case 2/2: b field is calculated directly on each slice
temporary_mesh = False
meshName = None
tx_field = None
rx_field = None
tx_name = self.tx._dump2Temp()
if self.coincident:
rx_name = None
else:
rx_name = self.rx._dump2Temp()
else:
# case 3 import fields from custEM slices
temporary_mesh = False
meshName = None
tx_index = self.tx_index
rx_index = self.rx_index
tx_field = None
rx_field = None
self.kernelMeshCellVolume = None
self.kernelMeshCellCenter = None
slice_queue = mpc.JoinableQueue()
# output for worker, input for summation
sum_slice_queue = mpc.Queue()
# output for summation
stop_queue = mpc.JoinableQueue()
# end signal for summation
parent_id = os.getpid()
log.debug('parent_id: %d', parent_id)
log.info(
'Get %d slices... start calculating in %d parallel processes.',
N, number_of_processes)
processes = []
kernel_mesh_1d = pg.Mesh(self.kernelMesh1D)
out_pos_2d = np.array(kernel_mesh_1d.cellCenters())
indices = np.array_split(range(N), number_of_processes)
log.debug('indices: {}'.format(indices))
for i in range(number_of_processes):
log.debug('index: {}'.format(i))
worker = mpc.Process(target=_1DSliceKernelWorker,
args=(meshName,
out_pos_2d,
tx_field,
rx_field,
self.coincident,
self._zvec,
indices[i],
slice_queue,
parent_id,
i, # worker ID
self.survey.earth,
self.fid,
tx_name, # tx loop name
rx_name, # rx loop name
slice_name,
tx_index,
rx_index,
))
log.debug('created worker')
worker.daemon = True
processes.append(worker)
worker.start()
log.debug('started worker: {}'.format(worker.is_alive()))
for p in processes:
log.debug('still alive: {}'.format(p.is_alive()))
sum_worker = mpc.Process(target=_1DSliceSummationWorker,
args=(slice_queue,
sum_slice_queue,
parent_id,
stop_queue,
(len(self.pulses), len(self._zvec) - 1),
np.array(self.kernelMesh1D.cellSizes()),
))
sum_worker.daemon = True
sum_worker.start()
for p in processes:
log.debug('still alive: {}'.format(p.is_alive()))
log.debug('started additional worker for summation: {}'
.format(sum_worker.is_alive()))
# sum_worker.join()
isInterrupt = False
log.debug('sum worker still alive: {}'.format(sum_worker.is_alive()))
while True:
try:
if stop_queue.empty():
if not sum_worker.is_alive():
isInterrupt = True
if isInterrupt:
log.error(
'Sum Worker died, shutting '
'down the rest as well and abort run.')
sum_worker.terminate()
break
time.sleep(0.5)
else:
# waits till all slices are summarized, queue empty
stop_queue.get()
stop_queue.task_done()
log.debug('join slice queue')
slice_queue.join()
log.debug('finished, end of multiprocessing, cleaning up')
break
except KeyboardInterrupt:
isInterrupt = True
log.critical(
'detect Keyboard Interrupt. Shutting processes down.')
sum_worker.terminate()
break
for p in processes:
p.terminate() # tells the process to terminate
time.sleep(0.1)
if p.is_alive():
log.debug('proc not terminated, join()')
p.join() # waits till termination of p
if temporary_mesh:
os.unlink(meshName) # release temporary mesh file
if isInterrupt:
raise SystemExit
self.K = sum_slice_queue.get() # final calculated kernel, normalized
log.debug('final Kern: {}'.format(self.K.shape))
sum_worker.terminate() # tells sumWorker to terminate
if sum_worker.is_alive():
log.debug('sum_worker not terminated, join()')
sum_worker.join() # waits till termination of p
log.info('overall calculation time: %.4f seconds' %
(time.time() - tick_sI))
if temporary_mesh:
log.debug('temporary file deleted?: {} (supposed to be True)'
.format(not os.path.exists(meshName)))
return self.K
[docs] def show2DMesh(self):
if self.dimension == 2 and self.kernelMesh2D is not None:
try: # pg.__version__ < 1.1
from pygimli.mplviewer import (showMesh, drawMesh)
except ImportError: # pg.__version__ >= 1.1
from pygimli.viewer.mpl import (showMesh, drawMesh)
ax, _ = showMesh(self.kernelMesh2D,
self.kernelMesh2D.cellMarkers(),
cMap='summer',
label='region marker',
hold=True)
drawMesh(ax, self.kernelMesh2D)
plt.show()
else:
raise Exception('No 2D Mesh available')
return ax
[docs] def showLoopLayout(self, ax=None, **kwargs):
if self.tx is None or self.rx is None:
raise Exception(
'Cannot show loop layout if loops are not defined')
loops = [self.tx]
if not self.coincident:
loops.append(self.rx)
kwargs.setdefault('label', [f'tx: {self.tx!r}',
f'rx: {self.rx!r}'])
else:
kwargs.setdefault('label', [f'tx/rx: {self.tx!r}'])
showLoopLayout(*loops, ax=ax, **kwargs)
return ax
[docs] def show(self,
toplot=['real', 'imag', 'amp', 'phase', '0D'], indices=None,
savename=None, normed=False,
suptitle=None,
ax=None,
pulse_in_log=False,
kernel_absolute_values=False,
cbar_percentage=0.99,
fixed_cbar=False,
lut=33,
show_marked_edges=False,
**kwargs): # sended to plotting subroutines
"""
Visualise the Kernel with respect to the desired dimension.
Cases
-----
Automatically defined within the kernel class via the parameter
kernel.dimension = [0...3]. Plotting of a kernel in the desired
dimension is only possible if the kernel is also calculated with
respect to that dimension. It's not possible to calculate the kernel
with kernel.dimension = 1 and then plot the kernel with
kernel.dimension = 2.
0D :
Simple Graph plotting kernel-values over pulsemoments
1D :
Graph with 1D integrated kernels over the depth of the model
2D :
Slice of the x-z-plane with triangle mesh containing the 2D
3D :
Export of the kernel in vtk format for visualising.
Parameter 0D
------------
none so far
Parameter 1D
------------
Plots the 1D integrated Kernel with a given z discretisation over the
measured pulse sequences.
toplot: list [ ['real', 'imag', 'amp', 'phase', '1D'] ]
There are different possibilities to plot the kernel. This
parameter defines which part of the kernel is shown. Possible
options are: 'real', 'imag', 'amp', 'phase', '0D' (integrated over
z). All strings in the toplot variable will be plotted in the same
order given in the list.
cMap: string ['viridis']
Defines the colormap used to display the kernel. In order to get a
good contrast between the max and min as well as being useful in
comparison with MRSMatlab, 'viridis' is the default colormap. Any
colormap reachable by the plt.get_cmap(...) method can be chosen.
normed: bool [True]
A on the dimension based normalisation of the plot permits
a better assessment of the kernel distribution.
ax: plotting ax or list of axes [None]
Plot on a predefined ax and gives back the ax. A onedimensionla
list of axes is also accepted, if the number of items in 'toplot'
is the same as the available axes.
lut: None or int [None]
Number of colors for the colorbar.
If lut is not None it must be an integer giving the number of
entries desired in the lookup table, and name must be a standard
mpl colormap name.
Parameter 2D
------------
indices: list
By default one 2D plot is created for each pulsemoment. In order
to limit the number of plots the optional paramter indices can be
given as a list of indices referring to the pulse moments to be
shown.
cMap: string ['viridis']
See Parameter 1D.
normed: bool [True]
A on the dimension based normalisation of the plot permits
a better assessment of the kernel distribution.
show_marked_edges: boolean [ False ]
Whether or not marked edges gets drawn.
possible kwargs for matplotlib:
cMin, cMax for range of the colorbar.
All other kwargs are reaching matplotlib functions.
default label 2D:
'integrated kernel (2D) [nV/$m^2$] pulsemoment: {:.3f} As'
.format(self.pulses[i])
Parameter 3D
------------
A self-sufficient plot of the kernel without any integration would
result in a set of 3D Cubes and is not implemented for now.
Instead the kernel will be saved in vtk format which can be easily
handled.
savename: string
A String defining the relative path to the vtk-file the kernel will
be saved in. If not given the default savename will be flagged with
the string '_default_' and contain some information about the
kernel.
EXAMPLE
-------
2D:
>>> ax, cbar = kernel.show(indices=[16], cMin=-1,
>>> cMax=2, size=20, pad=0.7)
>>> ax.set_ylim(-50, 0)
"""
import matplotlib as mpl
from mpl_toolkits.axes_grid1 import make_axes_locatable
mpl.rcParams['axes.unicode_minus'] = False
if self.dimension == 0:
print('Kernel.show(0D): simple Graph')
_, ax = plt.subplots(1, 1)
ax.plot(self.pulses, 1e9 * np.absolute(self.K), 'ro--')
ax.set_xlabel('pulsemoments [As]')
ax.set_ylabel('magnitude of the integrated kernel [nV/m^3]')
elif self.dimension == 1:
# plotaxes
xachse_mitte = (self.pulses[1:] +
self.pulses[:-1])/2
xachse_erweitert = [2 * self.pulses[0] -
xachse_mitte[0]]
xachse_erweitert.extend(xachse_mitte)
xachse_erweitert.append(2 * self.pulses[-1] -
xachse_mitte[-1])
xachse = np.array(xachse_erweitert)
xx, zz = np.meshgrid(xachse, self._zvec)
# def
def Plot1D(xx, zz, kernel,
toplot=['real', 'imag', 'amp', 'phase', '0D'],
cMap='viridis',
ax=None,
pulse_in_log=True,
lut=None,
**kwargs):
log.debug('Plot1D kwargs: {}'.format(kwargs))
if ax is None:
_, ax = plt.subplots(1, len(toplot),
sharey=True, sharex=True)
if len(toplot) == 1:
ax = [ax] # or ax[0] will fail
num_normal = 0
# original input
size = kwargs.pop('size', 12)
clim_in = kwargs.pop('clim', None)
real_imag_0d = kwargs.pop('real_imag', False)
# initial values
clim = clim_in
cbar = None
for i, item in enumerate(toplot):
# optionally normalizing the plot to values per meter
# default = False
Z_weight = np.ones(len(self._zvec[1:]))
if normed is True and item.lower() != 'phase':
Z_weight = np.abs(self._zvec[1:] - self._zvec[:-1])
Z_weight = Z_weight[:, np.newaxis]
if item.lower() == '0d':
if len(toplot) > 1:
sep_ax = plt.subplot(1, len(toplot), i + 1)
else:
sep_ax = ax[0]
to_plot_0d = ['abs']
if real_imag_0d:
to_plot_0d = ['abs', 'real', 'imag']
misc.drawSoundingCurve(
sep_ax, self.getKernel(), self.pulses, size=size,
to_plot=to_plot_0d,
color=kwargs.pop('dot_color', 'r'))
continue
elif item.lower() == 'real':
item_plot = np.copy(kernel.real.T) / Z_weight
num_normal += 1
elif item.lower() == 'imag':
item_plot = np.copy(kernel.imag.T) / Z_weight
num_normal += 1
elif item.lower() in ('amp', 'abs'):
item_plot = np.absolute(kernel).T / Z_weight
num_normal += 1
elif item.lower() == 'phase':
item_plot = np.arctan2(kernel.imag,
kernel.real).T
num_normal += 1
else:
log.error(
'toplot variable "{}" cannot be identified. Abort.'
.format(item))
return
if clim_in is None:
cmap, clim = getCMapAndLim(item_plot[1:, :],
phase=item == 'phase',
lut=lut)
# clim = [quantile(item_plot[1:, :],
# perc=1. - cbar_percentage,
# add_rel=-0.1),
# quantile(item_plot[1:, :],
# perc=cbar_percentage)]
else:
cmap = cMap
im0 = ax[i].pcolormesh(
xx, zz, item_plot,
cmap=cmap)
ax[i].set_title(item)
divider = make_axes_locatable(ax[i])
cax = divider.append_axes("right", size="18%", pad=.06)
cbar = plt.colorbar(im0, cax=cax)
if item == 'phase':
im0.set_clim(-np.pi, np.pi)
cbar.set_ticks([-np.pi, 0, np.pi])
cbar.ax.set_yticklabels(['$-\pi$', ' $0$', ' $\pi$'],
size=size + 1)
else:
im0.set_clim(clim[0], clim[1])
cbar.ax.tick_params(labelsize=size)
ax[i].set_xlim(np.min(xx), np.max(xx))
ax[i].set_ylim(np.min(zz), np.max(zz))
if pulse_in_log:
ax[i].set_xscale('log')
ax[i].set_xlabel('pulse moments [As]')
if num_normal == 1:
# the counter ensures that this is only done once per
# plot and only if needed
ax[i].set_ylabel('depth [m]')
setAxSize(ax[i], size)
return ax, cbar
# plots
log.info('Kernel.show(1D): pcolormesh')
size = kwargs.get('size', 12)
dot_color = kwargs.get('dot_color', 'r')
cMap = kwargs.get('cMap', 'viridis')
ax, cbar = Plot1D(
xx, zz, self.getKernel(reduced=False), toplot=toplot,
cMap=cMap, ax=ax, pulse_in_log=pulse_in_log, size=size,
real_imag=kwargs.pop('real_imag', False),
clim=kwargs.get('clim', None),
lut=lut, dot_color=dot_color)
if suptitle is not None:
plt.suptitle(suptitle)
elif self.dimension == 2:
mpl.rcParams['mathtext.default'] = 'regular'
# orientation = kwargs.get('orientation', 'vertical')
if indices is None:
indices = np.arange(0, len(self.pulses), 5)
indices = np.atleast_1d(indices)
if ax is None:
ax = [None] * len(indices)
if not isinstance(ax, list):
ax = [ax]
cbar = [None] * len(indices)
xlim = kwargs.pop('xlim', None)
ylim = kwargs.pop('ylim', None)
size = kwargs.get('size', 12)
log.debug(kwargs)
log.info('Kernel.show(2D): triangle mesh: pg.show(), '
'for {} pulse(s).'.format(len(indices)))
toplot = np.atleast_1d(toplot)
if len(toplot) != 1:
log.info(f'Changes toplot from {toplot} to "abs". '
'Please use only one entry per 2d export.')
toplot = ['abs']
if toplot[0] == 'imag':
toshow = 1e9 * np.imag(self.K) /\
np.array(self.kernelMesh2D.cellSizes()) # MMP
elif toplot[0] == 'real':
toshow = 1e9 * np.real(self.K) /\
np.array(self.kernelMesh2D.cellSizes()) # MMP
elif toplot[0] in ('abs', 'amp'):
toshow = 1e9 * np.absolute(self.K) /\
np.array(self.kernelMesh2D.cellSizes()) # MMP
elif toplot[0] in ('phase', 'angle'):
toshow = np.angle(self.K)
elif toplot[0] in ('0d', '0D'):
dot_color = kwargs.get('dot_color', 'r')
if kwargs.pop('real_imag', True):
to_plot_0d = ['abs', 'real', 'imag']
else:
to_plot_0d = ['abs']
misc.drawSoundingCurve(
ax[0], self.getKernel(), self.pulses, size=size,
color=dot_color, to_plot=to_plot_0d)
# need to exit at this point
return ax, None
else:
raise Exception('{} not valid'.format(toplot))
for n, i in enumerate(indices):
log.log(16, 'plot ({}) for pulse moment {:.2f} As'
.format(toplot[0], self.pulses[i]))
dd = toshow[i]
assert np.all(np.isfinite(dd)), 'check cell sizes for zeros'
if toplot[0] not in ('phase', 'angle'):
if not fixed_cbar:
absmax = quantile(np.abs(dd), perc=cbar_percentage,
add_rel=0)
kwargs.setdefault(
'cMin', -absmax if not np.alltrue(dd >= 0) else 0)
kwargs.setdefault('cMax', absmax)
else:
if n == 0:
mm = quantile(np.abs(toshow), perc=cbar_percentage)
mm_neg = -mm if np.any(toshow < 0) else 0
kwargs.setdefault('cMax', mm)
kwargs.setdefault('cMin', mm_neg)
if np.isclose(kwargs.get('cMin'), -kwargs.get('cMax')):
cmap = kwargs.pop('cMap', plt.get_cmap('bwr'))
else:
cmap = kwargs.pop('cMap', plt.get_cmap('viridis'))
else:
kwargs.setdefault('cMin', -np.pi)
kwargs.setdefault('cMax', np.pi)
cmap = kwargs.pop('cMap', 'twilight_shifted')
cMin = kwargs.get('cMin')
cMax = kwargs.get('cMax')
log.debug('min/max data to plot: {} / {}'
.format(np.min(dd), np.max(dd)))
log.debug('min/max cbar range: {} / {}'
.format(cMin, cMax))
if not show_marked_edges:
# local copy to mask boundary markers
mesh = pg.Mesh(self.kernelMesh2D)
mesh.setBoundaryMarkers(np.zeros(mesh.boundaryCount(),
dtype=int))
else:
# just reference
mesh = self.kernelMesh2D
ax[n], cbar[n] = \
pg.show(mesh,
data=dd,
colorBar=kwargs.pop('colorBar', True),
cMap=cmap,
ax=ax[n],
logScale=kwargs.pop('logScale', False),
cMin=cMin,
cMax=cMax)
if cbar[n] is not None:
label = kwargs.pop(
'label',
'integrated kernel (2D) [nV/m²] pulsemoment: {:.3f} As'
.format(self.pulses[i]))
cbar[n].set_label(label=label,
size=size)
cbar[n].ax.tick_params(labelsize=size)
if suptitle is not None:
plt.suptitle(suptitle)
ax[n].set_xlabel('distance [m]')
ax[n].set_ylabel('depth [m]')
if xlim is not None:
ax[n].set_xlim(xlim[0], xlim[1])
if ylim is not None:
ax[n].set_ylim(ylim[0], ylim[1])
for item in ([ax[n].title, ax[n].xaxis.label,
ax[n].yaxis.label] + ax[n].get_xticklabels() +
ax[n].get_yticklabels()):
item.set_fontsize(size)
elif self.dimension == 3:
print('Kernel.show(3D): export vtk instead...')
self.exportVTK(savename)
ax = None
cbar = None
elif self.dimension == 4:
print('nice try')
return ax, cbar
def _debug_showKernelParts(self, ax=None, ind=0, real=True, showabs=True,
xlim=None, ylim=None, clim1=None, clim2=None,
clim3=None, clim4=None):
"""
Plots the three 2D Kernel parts according to Hertrich (2005, fig 9).
For debugging purpose only, works only in debug mode of kernel class.
"""
if self.debug is False:
raise Exception('"showKernelParts" only works in debug mode '
'in order to save memory in production mode.')
if ax is None:
fig, ax = plt.subplots(2, 2, figsize=(14, 14))
else:
fig = np.squeeze(ax)[0].figure
print('K_part1')
print(type(self.K_part1[ind][0]))
axes = ax.flatten()
if showabs:
toplot1 = np.abs(self.K_part1[ind])
cMap = plt.get_cmap('jet', 60)
# for comparison against hertrich et al. 2005
else:
if real:
toplot1 = np.real(self.K_part1[ind])
else:
toplot1 = np.imag(self.K_part1[ind])
cMap = plt.get_cmap('bwr', 31)
if clim1 is None:
absmax1 = quantile(toplot1, perc=1, add_rel=0)
absmin1 = quantile(toplot1, perc=0.005, add_rel=0)
if showabs:
clim1 = [absmin1, absmax1]
else:
clim1 = [-absmax1, absmax1]
# Integrated Transmitter Part of the kernel function
_, cbar1 = pg.show(self.kernelMesh2D,
data=toplot1,
colorBar=True,
cMap=cMap,
logScale=showabs,
ax=axes[0],
cMin=clim1[0],
cMax=clim1[1])
axes[0].set_title('part 1')
print('K_part2')
if showabs:
toplot2 = np.abs(np.atleast_2d(self.K_part2)[0]) # (1, n)
cMap = plt.get_cmap('jet', 60)
# for comparison against hertrich et al. 2005
else:
if real:
toplot2 = np.atleast_2d(self.K_part2)[0].real # (1, n)
else:
toplot2 = np.atleast_2d(self.K_part2)[0].imag # (1, n)
toplot2 = 1e9 * toplot2
if clim2 is None:
absmax2 = quantile(toplot2, perc=1, add_rel=0)
absmin2 = quantile(toplot2, perc=0.005, add_rel=0)
if showabs:
clim2 = [absmin2, absmax2]
else:
clim2 = [-absmax2, absmax2]
# Integrated Receiver Part of the kernel function
_, cbar2 = pg.show(self.kernelMesh2D,
data=np.squeeze(toplot2), # (n,)
colorBar=True,
cMap=cMap,
logScale=showabs,
cMin=clim2[0],
cMax=clim2[1],
ax=axes[1])
axes[1].set_title('part 2')
if self.coincident:
axes[2].text(0.5, 0.5, 'coincident, geometry factor = 1',
ha='center', va='center',
transform=ax[2].transAxes)
else:
print('K_part3')
if showabs:
toplot3 = np.abs(np.atleast_2d(self.K_part3)[0]) # (1, n)
cMap = plt.get_cmap('jet', 60)
# for comparison against hertrich et al. 2005
else:
if real:
toplot3 = np.atleast_2d(self.K_part3)[0].real # (1, n)
else:
toplot3 = np.atleast_2d(self.K_part3)[0].imag # (1, n)
if clim3 is None:
absmax3 = quantile(toplot3, perc=1, add_rel=0)
absmin3 = quantile(toplot3, perc=0.005, add_rel=0)
if showabs:
clim3 = [absmin3, absmax3]
else:
clim3 = [-absmax3, absmax3]
# Integrated Geometric Part of the Kernel
_, cbar3 = pg.show(self.kernelMesh2D,
data=np.squeeze(toplot3), # (n,)
colorBar=True,
cMap=cMap,
logScale=showabs,
ax=ax[2],
cMin=clim3[0],
cMax=clim3[1])
axes[2].set_title('part 3')
if clim4 is None:
kw = {}
else:
kw = {'cMin': clim4[0], 'cMax': clim4[1]}
_, cbar4 = self.show(
toplot=['abs'if showabs else 'real' if real else 'imag'],
ax=axes[3], indices=[ind],
cbar_percentage=1,
logScale=showabs, cMap=cMap, **kw)
axes[3].set_title('kern')
if xlim:
for axi in axes:
axi.set_xlim(xlim)
if ylim:
for axi in axes:
axi.set_ylim(ylim)
return ax, (cbar1, cbar2, cbar3, cbar4[0])
[docs] def export2DKernel(self, fig=None, ax=None, savename=None, png_dpi=300,
noYLabel=False, index=0, colorBar=True,
size=13, pdf=None, fixed_cbar=False, **kwargs):
""" Exports 2D Kernel for given pulse moment.
Kwargs are redirected to *show*.
"""
with cm.plt_ioff():
suptitle = kwargs.pop('suptitle', '')
if fig is None or ax is None:
if kwargs.get('toplot') in ('0d', '0D'):
figsize = (6, 9)
else:
figsize = None
fig, ax = plt.subplots(1, 1, figsize=figsize)
self.show(indices=[index], size=size, ax=ax, colorBar=colorBar,
fixed_cbar=fixed_cbar, **kwargs)
ax.set_ylabel('Z [m]')
ax.set_xlabel('X [m]')
if noYLabel is True:
ax.set_ylabel('')
ax.yaxis.set_major_locator(plt.NullLocator())
plt.suptitle(suptitle)
if pdf is not None:
pdf.savefig(fig, bbox_inches='tight')
plt.close(fig)
else:
if savename is not None:
fig.savefig(
savename, bbox_inches='tight',
dpi=png_dpi if savename.endswith('.png') else None)
plt.close(fig)
[docs] def export2DKernel2PDF(self, name, fixed_cbar=False, **kwargs):
""" Export 2D Kernel for all pulse moments as stiched pdf.
Kwargs are redirected to *export2DKernel*.
"""
from matplotlib.backends.backend_pdf import PdfPages
log.info('export 2D Kernel as pdf.')
with PdfPages(name) as pdf:
if kwargs.get('toplot') in ('0d', '0D'):
rp = [0]
else:
rp = range(len(self.pulses))
for ind in rp:
log.info('pulse: {}/{}'.format(ind + 1, len(self.pulses)))
self.export2DKernel(
pdf=pdf,
index=ind,
savename=None,
fixed_cbar=fixed_cbar,
**kwargs)
d = pdf.infodict()
d['Title'] = name
d['Subject'] = self.__repr__()
[docs] def exportVTK(self, savename, save=['abs'],
save_in_log=False):
if self.dimension not in [2, 3]:
print('Export as VTK only for 2D or 3D implemented.')
return False
tick = time.time()
checkDirectory(savename, filename=True)
mesh = pg.Mesh(self.kernelMesh2D)
pm_vec = self.pulses
for i in range(np.shape(self.K)[0]):
if isinstance(self.K[0, 0], complex):
if 'real' in save:
kernel_r = 1e9 * self.K[i].real / np.array(
self.kernelMesh2D.cellSizes())
if save_in_log:
kernel_r = np.log10(np.abs(kernel_r))
mesh.addData('kernel_%05.2f_real' % (pm_vec[i]),
pg.Vector(kernel_r))
if 'imag' in save:
kernel_i = 1e9 * self.K[i].imag / np.array(
self.kernelMesh2D.cellSizes())
if save_in_log:
kernel_i = np.log10(np.abs(kernel_i))
mesh.addData('kernel_%05.2f_imag' % (pm_vec[i]),
pg.Vector(kernel_i))
if 'abs' in save or 'amp' in save:
kernel_a = 1e9 * np.absolute(self.K[i]) / np.array(
self.kernelMesh2D.cellSizes())
if save_in_log:
kernel_a = np.log10(kernel_a)
mesh.addData('kernel_%05.2f_abs' % (pm_vec[i]),
pg.Vector(kernel_a))
# ABS = np.absolute(kernel[i]) * np.sign(kernel[i].real)
if 'phase' in save:
mesh.addData('kernel_%05.2f_phase' % (pm_vec[i]),
pg.Vector(np.angle(self.K[i])))
else:
mesh.addData('kernel_%05.2f' % (pm_vec[i]),
(pg.Vector(self.K[i])))
print('save kernel to "{}" ...'.format(savename), end='')
mesh.exportVTK(savename)
print(' %.3f sec' % (time.time() - tick))
return True
[docs] def export1D(self, savename, loop_layout=True, title='{0.survey.earth!r}'):
with cm.plt_ioff():
fig, ax = plt.subplots(1, 5, figsize=(16, 9))
ax, _ = self.show(ax=ax, size=8, cbar_percentage=0.99)
fig.suptitle(title.format(self))
ax[0].figure.savefig(savename + '.png')
log.info('saved "{}"'.format(savename + '.png'))
plt.close(fig)
if loop_layout:
fig, ax = plt.subplots(1, 1, figsize=(16, 9))
ax = self.showLoopLayout(ax=ax)
ax.figure.savefig(savename + '_layout.png')
log.info('saved "{}"'.format(savename + '_layout.png'))
plt.close(fig)
[docs]def checkForKernel(name, mkdir=False):
if not os.path.exists(name):
if mkdir is True:
path = os.path.dirname(name)
if path != '':
if not os.path.exists(path):
os.mkdir(path)
return False
else:
return True
[docs]def simpleZVec(numz, minz, reduced=False):
kern = Kernel()
kern.createZVector(numz, minz)
return kern.getZVector(reduced=reduced)
[docs]def integrateKernelH2(mat, array):
array = np.atleast_2d(array)
new_arr = np.zeros((array.shape[0], mat.nRows()), dtype=complex)
for ind, subarr in enumerate(array):
new_arr[ind] = (mat * subarr.real).array() + \
1j * (mat * subarr.imag).array()
return np.squeeze(new_arr)
def _2DSliceSummationWorker(queue_in, queue_sum, stop_queue, shape,
y_count, worker_count, parent_id):
""" Worker in the kernel-multicore-calculation approach. """
# first task
kern = np.zeros(shape, dtype=complex)
log.debug('started sum worker')
done = 0
if sys.platform == 'linux' and log.getEffectiveLevel() <= 20:
iterable = progressBar(range(worker_count), 'gather kernel parts: ')
else:
iterable = range(worker_count)
log.debug('Summation worker: start waiting for processes to finish.')
while queue_in.qsize() < worker_count:
time.sleep(0.2)
for done in iterable:
while True:
if os.getppid() != parent_id:
sys.exit(
'My main process died (%d != %d)! So... I\'m out too.' %
(os.getppid(), parent_id))
try:
newSlice = queue_in.get(timeout=0.1) # aquire queue
kern += newSlice
queue_in.task_done() # release queue
# done += 1
if sys.platform != 'linux':
log.log(13, f'shape: {newSlice.shape}, total: {done}'
.format(done + 1))
del newSlice # sometimes auto memory clean up isnt working
break
except (queue.Empty, IOError): # IOError = broken pipe
# don't worry the other workers are not finished yet: try again
# print('sleeping')
time.sleep(0.1)
# shutting process down
queue_sum.put(kern, block=False)
time.sleep(0.2)
stop_queue.put(True)
return
def _1DSliceSummationWorker(queue_in, queue_sum, parent_id, stop_queue, shape,
kern_cell_sizes):
""" Worker in the kernel-multicore-calculation approach. """
# first task
kern = np.zeros(shape, dtype=complex)
log.log(13, 'started sum worker')
done = 0
while done < shape[1]:
if os.getppid() != parent_id:
sys.exit('My main process died (%d != %d)! So... I\'m out too.' %
(os.getppid(), parent_id))
try:
newSlice, idx = queue_in.get(timeout=0.3) # aquire queue
# integration over 2D Mesh in x-y plane
newSlice = np.sum(newSlice * kern_cell_sizes, 1)
# already weighted with y vec, cell sizes comes later from outside
kern[:, idx] = newSlice
queue_in.task_done() # release queue
done += 1
log.debug(f'shape: {newSlice.shape}, index: {idx}, total: {done}')
except (queue.Empty, IOError): # IOError = broken pipe
# not to worry the other workers are not finished yet: try again
time.sleep(1)
# shutting process down
queue_sum.put(kern, block=False)
time.sleep(0.2)
log.log(13, 'sum worker finished')
stop_queue.put(True)
return
def _2DSliceKernelWorker(srcMeshName, outPos2D, indices, yvec,
txSrcField, rxSrcField, outQueue,
parent_id, workerID, earth, fid, slice_name):
""" Worker in the kernel-multicore-calculation approach"""
time_loadmesh = time.time()
if slice_name is None:
log.log(13, 'worker {}: import sourcemesh "{}"...'
.format(workerID, srcMeshName))
srcMesh = pg.load(srcMeshName)
log.debug(srcMesh)
# outPos2D = 3d contruct with y-values = 0
rxalpha = None
rxbeta = None
rxzeta = None
rxperpend = None
ythks = np.abs(np.diff(yvec))
ymids = (yvec[:-1] + yvec[1:]) / 2
coincident = fid.tx_index == fid.rx_index
if workerID == 0:
todo = progressBar(indices, 'processed slices per worker: ')
else:
todo = indices
for yi in todo:
if os.getppid() != parent_id:
sys.exit('My main process died ({} != {}) So... I\'m out too.'
.format(os.getppid(), parent_id))
# y midpoint, index of the pos of the y-value in the 3D mesh
log.debug('worker no{} y = {:.2f}'.format(workerID, ymids[yi]))
sys.stdout.flush()
if slice_name is None:
# outPos2D = 3d contruct with y-values = 0
slice3D = np.copy(outPos2D)
slice3D[:, 1] = np.ones(np.shape(slice3D)[0]) * ymids[yi]
# do interpolation from 3D FEM Mesh to 2D slice as matrix
iMat = srcMesh.interpolationMatrix(slice3D)
# use matrix to calc tx field
txfieldSlice = cm.interpolateField_Matrix(txSrcField, iMat)
if not coincident:
# optionally use matrix to calc rx field
rxfieldSlice = cm.interpolateField_Matrix(rxSrcField, iMat)
# next slice will be different, can delete rest to ensure clean up
# (this can otherwise be tricky in multiprocessing)
del slice3D
del iMat
else:
# load custEM results
log.debug('Worker {} load field slice {}'.format(
workerID, slice_name.format(fid.tx_index, yi)))
field_slice_tx = np.load(slice_name.format(fid.tx_index, yi))
# shape: (mesh, 6)
# 6: complex-field (x, y, z), coords (x, y, z)
# we only need the field -> [:, :3]
txfieldSlice = field_slice_tx[::-1, :3]
txfieldSlice = txfieldSlice.T * 4e-7 * np.pi
# (n, 3) -> (3, n) and H to B
del field_slice_tx
if not coincident:
field_slice_rx = np.load(slice_name.format(fid.rx_index, yi))
rxfieldSlice = field_slice_rx[::-1, :3]
rxfieldSlice = rxfieldSlice.T * 4e-7 * np.pi
# (n, 3) -> (3, n) and H to B
del field_slice_rx
# elliptical decomposition
txalpha, txbeta, txzeta, txperpend = \
Kernel.ellipticalDecomposition_multi(txfieldSlice,
earth)
if not coincident:
rxalpha, rxbeta, rxzeta, rxperpend = \
Kernel.ellipticalDecomposition_multi(rxfieldSlice,
earth)
# kernel integration
(K1, K2, K3) = Kernel.kernelCalculation_multi(
fid, earth, txalpha, txbeta, txzeta, txperpend,
rxalpha=rxalpha, rxbeta=rxbeta, rxzeta=rxzeta,
rxperpend=rxperpend, calc_theta=False,)
# log.warning('Export of kernel slice.')
# path = Path('kernel_slices')
# path.mkdir(exist_ok=True)
# np.save(path.joinpath('kernel_slice_2D_{}.npy'
# .format(yi)).as_posix(),
# K1 * K2 * K3)
# Old before Nov 2020: output single slices
# outQueue.put((K1 * K2 * K3 * ythks[yi])) # weighting with y vec thk
# Change Nov 2020: summation here: queue only gets summed part of the
# kernel as summation worker is actually the slowest part of the
# kernel calculation routine.
kern_part = K1 * K2 * K3 * ythks[yi] # weighting with y vec thk
# added Nov 2020: 180 degree phase flip: np.conjugate
kern_part = np.conjugate(kern_part)
# weighting over cells sizes in calling function
if yi == indices[0]:
summed_kern_parts = kern_part
else:
summed_kern_parts += kern_part
log.debug('worker {}: slice {}/{} finished.'.format(
workerID, yi, indices))
# general clean up
del txalpha
del txbeta
del txzeta
del txperpend
del txfieldSlice
if rxSrcField is not None:
del rxfieldSlice
del rxalpha
del rxbeta
del rxzeta
del rxperpend
del K1
del K2
del K3
outQueue.put((summed_kern_parts))
log.debug('exit of Worker {} after {:.2f} seconds'
.format(workerID, time.time() - time_loadmesh))
sys.stdout.flush()
return
def _1DSliceKernelWorker(srcMeshName, outPos2D,
txSrcField, rxSrcField, coincident, zvec, indices,
outQueue, parent_id, workerID, earth,
fid, tx_name, rx_name, slice_name,
tx_index, rx_index):
""" Worker in the kernel-multicore-calculation approach"""
# import logging
# log = logging.getLogger('comet')
tickstart = time.time()
if txSrcField is not None:
log.debug('worker {}: import sourcemesh "{}"...'
.format(workerID, srcMeshName))
srcMesh = pg.load(srcMeshName)
log.debug(srcMesh)
# outPos2D = 3d contruct with z-values = 0
rxalpha = None
rxbeta = None
rxzeta = None
rxperpend = None
# lengths of the cells = weighting (in y direction)
z_thk_all = np.abs(np.diff(zvec))
z_mid_all = (zvec[:-1] + zvec[1:]) / 2
if slice_name is None:
if txSrcField is None:
# setup loop classes
if tx_name is None:
raise Exception('Need tx field or tx.')
tx_class = Loop(tx_name[0], config=tx_name[1])
if not coincident:
if rx_name is None:
raise Exception('Need rx.')
rx_class = Loop(rx_name[0], config=rx_name[1])
for idx in indices:
z_thk = z_thk_all[idx]
z_mid = z_mid_all[idx]
if os.getppid() != parent_id:
sys.exit('My main process died (%d != %d)! So... I\'m out too.' %
(os.getppid(), parent_id))
log.debug('worker no{} starts interpolating slice at z = {:.2f}'
.format(workerID, z_mid))
slice3D = np.copy(outPos2D)
slice3D[:, 2] = np.ones(np.shape(slice3D)[0]) * z_mid
if slice_name is None:
if txSrcField is not None:
# b field case 1/2: b field is interpolated from loop mesh
# calculate interpolation matrix for actual slice
iMat = srcMesh.interpolationMatrix(slice3D)
# interpolate bfield for tx
txfieldSlice = cm.interpolateField_Matrix(txSrcField, iMat)
if not coincident:
# interpolate bfield for rx (same matrix)
rxfieldSlice = cm.interpolateField_Matrix(rxSrcField, iMat)
del iMat
else:
# b field case 2/2: b field is calculated directly on each slice
# need pos, phi, and ds, or a proper loop.
tx_class.setLoopMesh(slice3D)
tx_class.field = None
tx_class.calculate(num_cpu=1, matrix=False,
interpolate=False)
txfieldSlice = tx_class.field
if not coincident:
rx_class.setLoopMesh(slice3D)
rx_class.field = None
rx_class.calculate(num_cpu=1, matrix=False,
interpolate=False)
rxfieldSlice = rx_class.field
else:
field_slice_tx = np.load(slice_name.format(fid.tx_index, idx))
txfieldSlice = field_slice_tx[::-1, :3]
txfieldSlice = txfieldSlice.T * 4e-7 * np.pi
del field_slice_tx
if not coincident:
field_slice_rx = np.load(slice_name.format(fid.rx_index, idx))
rxfieldSlice = field_slice_rx[::-1, :3]
rxfieldSlice = rxfieldSlice.T * 4e-7 * np.pi
del field_slice_rx
del slice3D
# elliptical decomposition
txalpha, txbeta, txzeta, txperpend = \
Kernel.ellipticalDecomposition_multi(txfieldSlice, earth)
if not coincident:
rxalpha, rxbeta, rxzeta, rxperpend = \
Kernel.ellipticalDecomposition_multi(rxfieldSlice, earth)
# kernel integration
(K1, K2, K3) = Kernel.kernelCalculation_multi(
fid, earth, txalpha, txbeta, txzeta, txperpend,
rxalpha=rxalpha, rxbeta=rxbeta, rxzeta=rxzeta, rxperpend=rxperpend,
calc_theta=False,)
# log.warning('Export of kernel slice.')
# path = Path('kernel_slices')
# path.mkdir(exist_ok=True)
# np.save(path.joinpath('kernel_slice_1D_{}.npy'
# .format(idx)).as_posix(),
# K1 * K2 * K3)
log.debug('put kernel Slice {}: shape = {}'
.format(idx, np.shape(K2)))
outQueue.put((K1 * K2 * K3 * z_thk, idx)) # weighting with z vec
# weighting over cells sizes in calling function
del txalpha
del txbeta
del txzeta
del txperpend
del txfieldSlice
if not coincident:
del rxfieldSlice
del rxalpha
del rxbeta
del rxzeta
del rxperpend
del K1
del K2
del K3
del z_mid
del z_thk
log.debug('exit of Worker {} after {:.2f} seconds'
.format(workerID, time.time() - tickstart))
return
def _interpolationMatrixWorker(worker_id, src_mesh, target_pos, id_offset,
pipe, log_level):
from comet.pyhed.misc import vec
from comet.pyhed import log
log.setLevel(log_level)
log.debug('import source mesh')
src = pg.Mesh(src_mesh)
mat = src.interpolationMatrix(pg.R3Vector(target_pos))
pipe.put([vec.getRSparseValues(mat, getInCRS=False), id_offset,
src.nodeCount()])
mat.clear()
log.debug(f'exit of Worker {worker_id}')
return
[docs]def calcInterpolationMatrix_para(source_mesh, target_pos, num_cpu=8):
# input handling
import multiprocessing as mpc
temporary_mesh = False
if isinstance(source_mesh, str):
pass
elif isinstance(source_mesh, pg.Mesh):
temporary_mesh = True
import tempfile as temp
temp_mesh = temp.NamedTemporaryFile(suffix='.bms', delete=False)
source_mesh.save(temp_mesh.name)
source_mesh = temp_mesh.name
temp_mesh.close()
out_queue = mpc.Queue()
nop = np.min([mpc.cpu_count(), len(target_pos), num_cpu])
row_count = len(target_pos)
splitted = np.array_split(target_pos, nop, axis=0)
log.info(f'Start calculating matrices in {nop} parallel processes.')
processes = []
index_split = 0
for i in range(nop):
worker = mpc.Process(target=_interpolationMatrixWorker,
args=(i,
source_mesh,
splitted[i],
index_split,
out_queue,
log.getEffectiveLevel()))
index_split += len(splitted[i])
worker.daemon = True
processes.append(worker)
worker.start()
rows = []
cols = []
vals = []
number = 0
while number < nop:
try:
sub_mat, split, col_count = out_queue.get()
except queue.Empty:
time.sleep(2.0)
continue
rows.extend(np.array(sub_mat[0]) + split)
cols.extend(sub_mat[1])
vals.extend(sub_mat[2])
number += 1
IMatrix = pg.SparseMapMatrix(pg.IndexArray(np.array(rows)),
pg.IndexArray(np.array(cols)),
pg.RVector(vals))
IMatrix.setRows(row_count)
IMatrix.setCols(col_count)
log.debug('Building up matrix ... ')
if temporary_mesh:
os.unlink(source_mesh) # release temporary mesh file
for p in processes:
if p.is_alive():
p.terminate()
p.join()
return IMatrix
[docs]def create1DInterpolationSlices(kern):
z_offsets = kern._zvec
cc = kern.kernelMesh1D.cellCenters()
coords = np.repeat(np.reshape(cc, (1, *cc.array().shape)),
len(z_offsets), axis=0)
inputcoords = np.concatenate([coords[:, :, 0, np.newaxis],
coords[:, :, 1, np.newaxis],
coords[:, :, 2, np.newaxis]], axis=2)
for zi, zval in enumerate(z_offsets):
inputcoords[zi, :, 2] = zval
inputcoords
return
[docs]def create2DInterpolationSlices(kern):
y_offsets = kern.yvec
cc = kern.kernelMesh2D.cellCenters()
coords = np.repeat(np.reshape(cc, (1, *cc.array().shape)),
len(y_offsets), axis=0)
inputcoords = np.concatenate([coords[:, :, 0, np.newaxis],
coords[:, :, 2, np.newaxis],
coords[:, :, 1, np.newaxis]], axis=2)
for yi, yval in enumerate(y_offsets):
inputcoords[yi, :, 1] = yval
return inputcoords
[docs]def calculateKernelFromSlices(
survey_name, invmesh, cfgname, max_length=0.05, max_num=400,
path_name='kernel', num_cpu=48, h_order=1, json_name=None,
force_new_paths=True, kernel_name='kernel/kern_{}',
slice_export_name='kernel_slice'):
"""
Parameters:
-----------
survey: string
Filepath of the survey containing the FIDs.
invmesh: string
Filepath for the inversion mesh the kernel is calculated on.
cfgname: string
Filepath of the secondary config containingin formation for
custEM. The same file should have been used for the field
calculation.
max_length: float [ 0.05 ]
Minimum distance between two slices in y direction.
max_num: integer [ 400 ]
Number of slices for y discretization.
path_name: string [ 'kernel' ]
Final name for the slices will be
{mdir}/paths/{path_name}_{number}_path.xml
"mdir" is defined in the secondary config.
num_cpu: integer [ 48 ]
Number of cores used.
h_order: integer [ 1 ]
Order of h-refinement when setting the invmesh in the
kernelclass.
json_name: string [ None ]
Optional name for the json file. Alternatively a tempory file
is created.
force_new_path: boolean [ True ]
Deletes old pathfiles (slices) and forces the generation of
new ones.
kernel_name: string [ 'kernel/kern_{}' ]
Filepath of used for export of the kernel functions. Need to
contain a "{}" which is filled with the index of the
corresponding FID in the survey class.
slice_export_name: string [ 'kernel_slice' ]
Filepath of the interpolated magnetic fields for the
individual kernel slices.
Full slice path contains:
"{r_dir}/{approach}/{mesh_name}/{mod_name}_interpolated/
tx_{tx_number}_{slice_export_name}_imesh_{slice_number}.npy"
Returns:
--------
kernel_name (added a "_{}" if not in original string)
"""
from comet import snmr
from comet import pyhed as ph
jdict = {}
jdict['survey'] = survey_name
jdict['invmesh'] = invmesh
jdict['cfgname'] = cfgname
jdict['max_num'] = max_num
jdict['max_length'] = max_length
jdict['num_cpu'] = num_cpu
jdict['path_name'] = path_name
jdict['h_order'] = h_order
jdict['force_new_paths'] = force_new_paths
jdict['slice_export_name'] = slice_export_name
json_name = ph.misc.dump2Json(json_name=json_name, **jdict)
success = ph.misc.local_apps_bash('totalFieldInterpolation',
json_name,
str(num_cpu))
ph.log.info('success: {}'.format(success))
if success != 0:
ph.log.error('Mpirun aborted.')
raise SystemExit()
# import prepared survey from second script
survey = snmr.Survey()
survey.load(survey_name, load_meshes=False)
cfg = ph.SecondaryConfig(cfgname)
# only working approach for this is E-total, hardcoded
cfg.approach = 'E_t'
for fi, fid in enumerate(survey.fids):
kern = survey.createKernel(fi, dimension=2)
# 1200 slices instead of 300
kern.createYVec(max_length=max_length, max_num=max_num)
kern.set2DKernelMesh(invmesh, order=h_order)
slice_dir = '{}/{}/{}/{}_interpolated'.format(
cfg.r_dir, cfg.approach, cfg.mesh_name, cfg.mod_name)
slice_name = 'tx_{}_' + f'{slice_export_name}' + '_imesh_{}.npy'
slices = Path(slice_dir).joinpath(slice_name).as_posix()
kern.calculate(num_cpu=num_cpu,
slice_name=slices)
if "{" not in kernel_name and "}" not in kernel_name:
kernel_name += '_{}'
kern.save(kernel_name.format(fi),
kernelmesh_name=Path(invmesh).name)
return kernel_name
# The End