Source code for comet.snmr.kernel.kernel_bib

"""
Part of comet/snmr/kernel

This file contents parts of the MRSmatlab Kernel function part
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import time
import os
import sys
import queue

import numpy as np
import pygimli as pg

from comet.pyhed import misc as cm
from comet.pyhed import log
from comet.pyhed.IO import checkDirectory, getItem, savefieldvtk
from comet.pyhed.loop import buildLoop, loadLoop, Loop
from comet.pyhed.plot import (showLoopLayout, setAxSize, cmap_phase, quantile,
                              getCMapAndLim)
from comet.pyhed.misc import sinhspace, progressBar

from comet.snmr.survey import Survey
from comet.snmr import misc

from pathlib import Path

import matplotlib.pyplot as plt


[docs]class Kernel(object): """ Basic class to solve the NMR kernel computation. Parameters ---------- name: string [ None ] If kernel is loaded from file. survey: survey class instance [ None ] Calls *setSurvey* to define underlaying survey class. Holds important attributes like pulse moments and the loops for tx and rx. tx: integer [ 0 ] Transmitter index in corresponding survey. rx: integer [ 0 ] Receiver index in corresponding survey. fid: interger [ 0 ] Sounding index in corresponding survey. dimension: integer [1] Defines the kernel integration. Example ------- >>> from comet.snmr import kernel as k >>> from comet.snmr import survey >>> site = survey.Survey() >>> kernel = k.kernel(site) >>> kernel.calculate() >>> kernel.save('savename') >>> kernel.show() """ def __init__(self, survey=None, fid=0, dimension=1, name=None): # attributes setted from initial call # ----------------------------------- # except for *name* # load(name) is called after init of other attributes # backup for pulse moments if not fid is prepared # mainly for plotting after loading a kernel class from harddisk self._pulses_backup = None # survey class self.survey = None # index of corresponding sounding in survey class self.fid_index = None self.setSurvey(survey, fid=fid) # 1D, 2D, or 3D self.dimension = dimension # attribute init for overview # --------------------------- # interpolation matrix from loop mesh to kernel mesh self.interpolationMatrix = None # b field copies (sometimes comes in handy) self.txBfield = None self.rxBfield = None # integrated kernel matrix: shape (pulses, cells) self.K = None # z discretization for dim = 1 self.kernelMesh1D = None self.min_thk = None self._zvec = None self._zvec_reduced = None # y discretization for dim = 2 self.yvec = None self.kernelMesh2D = None # for intergration self.kernelMeshCellVolume = None self.kernelMeshCellCenter = None # only used in debug mode self.K_part1 = None self.K_part2 = None self.K_part3 = None self.debug = False # elliptical decomposition self.txalpha = None self.txbeta = None self.txzeta = None self.txperpend = None self.rxalpha = None self.rxbeta = None self.rxzeta = None self.rxperpend = None # separate loopmesh for both loops self.seploopmesh = None self.slm_name = None self.seploopmesh_dipole = None self.slm_dipole_name = None self.num_cpu = 12 # internal h refinement of kernelmesh self.h_order = 0 self.h_mat = None # rotation angle and magnitude of magnetization after pulses self.theta = None self.magnetization = None # full 3D mesh for kernel (method: getMagnetizationMesh) self.mag_mesh = None # used only for magnetization calc and display if name is not None: self.load(name)
[docs] def release_memory(self): """ Calling this function is releasing some attributes that are using a fairly big amount of memory. Sets the following attributes back to None: - The interpolation matrix between the loop meshes and the kernel mesh *interpolationMatrix* - local copies of the magnetic fields (fields in tx and rx are not effected) *txBfield*, *rxBfield* - the 3D kernel mesh cell center and volumes *kernelMeshCellVolume*, *kernelMeshCellCenter* - the elliptical decomposition of the tx and rx bfields *txalpha*, *txbeta*, *txzeta*, *txperpend*, *rxalpha*, *rxbeta*, *rxzeta*, *rxperpend* Note: a recalculation of the kernel will take about the same amount of time as the first call, as all cached variables are gone, however apart from a recalculation, the other purposes of the kernel class (export, figures, inversion(without recalculation)) are not effected. Another note: If you want to use this method only for saving disk space in case you save the kernel class, then you might consider the *light* flag of the *.save* method instead. """ self.interpolationMatrix = None self.txBfield = None self.rxBfield = None self.kernelMeshCellVolume = None self.kernelMeshCellCenter = None self.txalpha = None self.txbeta = None self.txzeta = None self.txperpend = None self.rxalpha = None self.rxbeta = None self.rxzeta = None self.rxperpend = None
def __str__(self): str_ = f'survey: {self.survey!r}\n'\ f'fid: {self.fid!r}\n'\ f'earth: {self.survey.earth!r}\n'\ f'Tx: {self.tx!r}\n'\ if self.tx is not None: str_ += f'{self.tx.config!r}\n'\ if self.coincident: str_ += 'Rx: coincident\n' else: str_ += 'Rx: {0.rx!r}\n'\ '{0.rx.config!r}\n'.format(self) str_ += 'dimension: {0.dimension}'.format(self) if self.dimension == 1: if self.K is not None: str_ += '\nKernel Matrix pulse Moments: {0[0]} \nKernel \ Matrix layers: {0[1]}'.format(np.shape(self.K)) else: str_ += r'\nKernel Matrix not calculated yet.' return str_ def __repr__(self): str_ = 'kernel: {0.dimension}D, {1}'.format( self, 'coincident' if self.coincident else 'separated') return str_ @property def larmor(self): """ Larmor frequency [Hz] from earth defined in survey.""" larmor = self.survey.earth.larmor return larmor @property def magnetization_magnitude(self): magmag = self.fid.curie * misc.constants.gamma *\ (self.survey.earth.magnitude**2) return magmag @property def tx_area(self): """ Area of the transmitter loop.""" try: area = self.tx.area except AttributeError: # no tx as of yet area = None return area @property def rx_area(self): """ Area of the receiver loop.""" try: area = self.rx.area except AttributeError: # no rx as of yet area = None return area @property def tx(self): """ Reference to transmitter class instance in survey.""" if self.tx_index is None: tx = None else: try: tx = self.survey.loops[self.tx_index] except IndexError: # no tx as of yet tx = None return tx @property def rx(self): """ Reference to receiver class instance in survey.""" if self.rx_index is None: rx = None else: try: rx = self.survey.loops[self.rx_index] except IndexError: # no rx as of yet rx = None return rx @property def tx_index(self): try: index = self.fid.tx_index except AttributeError: # no fid as of yet index = None return index @property def rx_index(self): try: index = self.fid.rx_index except AttributeError: # no fid as of yet index = None return index @property def coincident(self): coi = self.tx_index == self.rx_index return coi @property def fid(self): """ Reference to sounding (FID) class instance in survey.""" try: fid = self.survey.fids[self.fid_index] except IndexError: # no fid as of yet fid = None return fid @property def zvec(self): """ z discretisation """ zvec = self.getZVector(reduced=True) return zvec @zvec.setter def zvec(self, vector): """ Convinience setter. For other *min_thk* use *setZVec*. """ if self.min_thk is None: self.min_thk = 0.5 self.setZVector(vector, min_thk=self.min_thk) @property def pulses(self): """ Reference to pulse moments from sounding (FID).""" try: pm_vec = self.fid.pulses if self._pulses_backup is not None: self._pulses_backup = None except AttributeError: # no fid as of yet pm_vec = self._pulses_backup if pm_vec is not None: log.debug('kernel.pulses: from backup') return pm_vec
[docs] def setPulsesDirectly(self, pulses): """ Set pulse moment vector manually if not supported by survey + fid. (This is called when loading a kernel from the harddisk, mainly for plotting reasons). For all calculation purposes a survey and fid class is recommended. """ if pulses is not None: self._pulses_backup = np.array(pulses)
def _updatePulses(self, disable_warning=False): """ Overrides the pulse moment vector of the kernel class with pulses found in fid. (Usually only called after setting a survey class in a loaded kernel.) Internal function, do not call from outside. If you are not sure if the kernel class has pulses, check the *.fid* or *.pulses*, otherwise load a kernel from harddisk or provide a survey class via *setSurvey*. If you change the pulses in the *fid*, the kernel recognizes them as changed without calling this function (which is the way the user controls the used pulse moments). """ if not disable_warning and self._pulses_backup is not None and not \ np.allclose(self._pulses_backup, self.pulses): log.warning( 'Overriding pulse moments ({}) with new pulse moment vector ' 'from fid ({}).'.format(self._pulses_backup, self.pulses)) self._checkPulses() # fid holds pulse, setting backup back to None to avoid interaction # (self._pulses_backup is only used after loading kernel from harddisk # without fid class instance provided) self._pulses_backup = None @property def shape(self): if self.dimension == 1: m_dim = len(self.zvec) - 1 else: if self.kernelMesh2D is not None: m_dim = self.kernelMesh2D.cellCount() else: m_dim = 0 if self.pulses is None: npulses = 0 else: npulses = len(self.pulses) # so (0, 0) indicates: no pulses, no mesh return (npulses, m_dim) @property def updatable(self): up = self.rx is not None and self.tx is not None up = up and self.pulses is not None return up
[docs] def setDebug(self, debug: bool): self.debug = debug
[docs] def setSurvey(self, survey, fid=0): """ Sets survey class containing necessary information for the kernel. Parameters ---------- survey: comet.snmr.survey.Survey or None Sets given survey class instance or create empty class instance. fid: integer [ 0 ] Index of corresponding sounding in the survey. """ if survey is None: self.survey = Survey() else: self.survey = survey self.fid_index = fid if self.fid is not None: self._updatePulses()
[docs] def setTx(self, tx, **kwargs): """ Sets initialized loop or pipe arg and kwargs to *loadLoop*. """ if isinstance(tx, str): self.tx = loadLoop(tx, **kwargs) else: self.tx = tx
[docs] def setRx(self, rx, **kwargs): """ Sets initialized loop or pipe arg and kwargs to *loadLoop*. """ if isinstance(rx, str): self.rx = loadLoop(rx, **kwargs) else: self.rx = rx
[docs] def setModel(self, *args, **kwargs): """ Pipes args and kwargs to *self.tx.setModel*. Same for rx.""" self.tx.setModel(*args, **kwargs) if self.rx is not None: self.rx.setModel(*args, **kwargs)
def _checkSrc(self): if self.tx is None: raise Exception('No transmitter found.') elif self.tx.type == 'Dummy': raise Exception('Only dummy transmitter found.') if self.rx is None: raise Exception('No receiver found.') elif self.rx.type == 'Dummy': raise Exception('Only dummy receiver found.') def _checkPulses(self): if self.fid is None: raise Exception( f'No sounding ({self.fid_index}) found in {self.survey}') if self.pulses is None: raise Exception( f'No pulses found in {self.fid}')
[docs] def createZVector(self, numz, minz, min_thk=0.5): """ Creates a sinus hyperbolicus shaped Z discretisation in numz steps between 0 and minz. """ vector = np.sinh( np.linspace(0.0, np.arcsinh(-np.abs(minz)), numz + 1)) self.min_thk = min_thk self.setZVector(vector, min_thk=self.min_thk)
[docs] def getZVector(self, reduced=True): if reduced is True: return self._zvec_reduced else: return self._zvec
[docs] def setZVector(self, vector, min_thk=0.5): """ Defines the attribute zvec. Sets the given vector as z discretization. Attention: the value for min_thk defines the minimum thickness of the discretization used in the end. For all thicknesses in vector smaller than min_thk, the Kernel is integrated to match the min_thk. For calulation of the kernel function the original given vector is used. Parameters ---------- vector: array_like Z discretization in m to be used for the kernel calculation. If a new vector is to be created, please also take a look at the method *createZVector*. min_thk: float Minimum thickness te kernel and zvec is integrated if returned. This leads to higher accuracy in the vicinity of the loop. """ if self.dimension != 1: log.warning('Setting zvec for kernel of Dim = {} is useless.' .format(self.dimension)) self._zvec = np.array(vector) self.min_thk = min_thk if self._zvec is not None: a, b = cm.vec.cumsumDepth(self._zvec, min_thk=self.min_thk) self.z_index, self._zvec_reduced = a, b else: self.z_index = None self._zvec_reduced = None
[docs] def setYVector(self, vector): if self.dimension != 2: log.warning('Setting yvec for kernel of Dim = {} is useless.' .format(self.dimension)) self.yvec = np.array(vector)
[docs] def getSliceCoords(self): """ Returns input coordinates for custEM Slice interpolation of magnetic fields to the kernel slices. """ slice_coords = self.kernelMeshCellCenter.T.reshape( (len(self.yvec) - 1), -1, 3) slice_list = [] for subslice in slice_coords: slice_list.append(subslice) return slice_list
[docs] def getKernel(self, reduced=True): if self.K is None: kern = None else: if self.dimension == 1 and reduced: kern = cm.vec.sumBetweenIndices(self.K, self.z_index, axis=1) else: kern = self.K return kern
[docs] def createSeperatedLoopMesh(self, name='SepLoopMesh', dipole=True, exportVTK=False, refinement_para=1.0, max_area_factor=1.0): """ Creates a mesh that contains the receiver and the transmitter loop. """ # tx and rx needed self._checkSrc() checkDirectory(name, filename=True) if self.coincident: pos = self.tx.pos else: pos = np.concatenate((self.tx.pos, self.rx.pos), axis=0) pos = pg.utils.unique_rows(pos) hlp = Loop((pos, np.ones(len(pos)), # fake phi np.ones(len(pos)), # fake ds False, # grounded 'arbitrary' )) hlp.setMeshParameters(refinement_para=refinement_para, max_area_factor=max_area_factor) self.seploopmesh = pg.Mesh(hlp.createLoopMesh(savename=name)) if exportVTK is True: self.seploopmesh.exportVTK('sepLoop.vtk') self.slm_name = (hlp.loop_mesh_name + '.')[:-1] # copy string if dipole is True: self.seploopmesh_dipole = \ pg.Mesh(hlp.createDipoleMesh(savename=name)) self.slm_dipole_name = (hlp.dipole_mesh_name + '.')[:-1] if exportVTK is True: self.seploopmesh_dipole.exportVTK('sepDipole.vtk')
[docs] def create1DKernelMesh(self, max_length=0.1, area=100., quality=32, zvec=None, size_factor=2.5, z_factor=2.5, export_xyplane=None, max_dipoles=2000, calc_3D_stats=True, xmin=None, xmax=None, ymin=None, ymax=None): """ In order to integrate the kernel to a 1D structure without interpolation errors, a special mesh consisting of triangular zylinders has to be defined. Parameters ---------- max_length: float [ 0.1 ] Defines the smallest edge length for the discretisation of the loop . In order to get admirable kernel results a value of 0.1 meters should be the maximum. area: float [ 100. ] Defines the maximum Area a triangle in the loop slice can have. quality: float [ 32. ] Defines the smallest angle inside a triangle. Be careful with values above 35. zvec: array_like [ None ] Usualy the zvec is defined automatically, this flag gives the user the optional possibility to give a zvec from outside the funktion. size_factor: float [ 2.5 ] Extension of the kernel mesh (and therefore integration volume) in the x and y direction. Should be at least 2 times the loop diameter or shortest edge length. This value defines the multipier. z_factor: float [ 2.5 ] Maximum depth of the Kernel. Should be at least 2 times the loop diameter or shortest edge length. This value defines the multipier. export_xyplane: string [ None ] Filename for the resulting kernel mesh plane in 2D can be exported for debugging or simply to check the mesh (vtk). max_dipoles: interger [ 2000 ] Fallback for high node density loops. This sets an overall maximum for the number of dipoles used for the loop discretization. However this only comes into account in rare cases. """ # input checks self._checkSrc() if self.tx.type == 'Dummy': raise Exception('need proper Loop for creating a KernelMesh.') # compute outer limits for loopmesh max_length = max(max(sum(self.tx.ds), sum(self.rx.ds)) / max_dipoles, max_length) log.debug(f'create1DKernelMesh: wire discretization: {max_length} m') txhlp = buildLoop(self.tx.pos, max_length=max_length) if not self.coincident: rxhlp = buildLoop(self.rx.pos, max_length=max_length) pos = np.concatenate((txhlp.pos, rxhlp.pos), axis=0) pos = pg.utils.unique_rows(pos) maxdis = min(txhlp._maxdistance, rxhlp._maxdistance) else: pos = txhlp.pos maxdis = txhlp._maxdistance minx = np.min(pos[:, 0]) - size_factor * maxdis maxx = np.max(pos[:, 0]) + size_factor * maxdis miny = np.min(pos[:, 1]) - size_factor * maxdis maxy = np.max(pos[:, 1]) + size_factor * maxdis minz = np.min(self.tx.pos[:, 2]) - z_factor * maxdis if xmin is not None: minx = min(minx, xmin) if xmax is not None: maxx = max(maxx, xmax) if ymin is not None: miny = min(miny, ymin) if ymax is not None: maxy = max(maxy, ymax) if len(pos) == 1: # just one dipole # 10 * 10 unit cube around the dipole minx = -10. + pos[0][0] maxx = 10. + pos[0][0] miny = -10. + pos[0][1] maxy = 10. + pos[0][1] minz = -10. + pos[0][2] area2D = (maxx - minx) * (maxy - miny) cell_aprox = int(area2D // area) if cell_aprox > 100000: log.warning( 'this will be a faily big 1D kernel mesh, you may consider ' 'using another value for area in *create1DKernelMesh*. This ' 'mesh will have roughly {} cells + source discretization.' .format(cell_aprox)) log.debug('cells due to area parameter: at least {} / {} ~ {} cells.' .format(area2D, area, cell_aprox)) if zvec is None: # create new if self.zvec is None: if self.pulses is None: numZ = 80 else: numZ = len(self.pulses) * 4 self.createZVector(numZ, minz) else: # set given self.setZVector(zvec) # build kernel mesh: outer mesh kernelpoly = pg.Mesh(2) ob0 = kernelpoly.createNode(minx, miny, 0) ob1 = kernelpoly.createNode(maxx, miny, 0) ob2 = kernelpoly.createNode(maxx, maxy, 0) ob3 = kernelpoly.createNode(minx, maxy, 0) kernelpoly.createEdge(ob0, ob1) kernelpoly.createEdge(ob1, ob2) kernelpoly.createEdge(ob2, ob3) kernelpoly.createEdge(ob3, ob0) # build kernel mesh: sources for position in pos: kernelpoly.createNode(position) # build kernel mesh: create triangle mesh kernelmesh = pg.meshtools.createMesh( kernelpoly, quality=quality, area=area, smooth=[2, 1]) if export_xyplane is not None: kernelmesh.exportVTK(export_xyplane) self.set1DKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats)
[docs] def set1DKernelMesh(self, mesh, calc_3D_stats=True): """ Sets the 1D kernel mesh. Parameters ---------- mesh: stirng or pygimli.Mesh Filename or mesh instance of a 2D mesh in the x-y plane. Need ---- z discretization: Can be setted via *createZVector*, *setZVector* or direct use of *create1DKernelMesh*. However the needed information to do that may not be available on the fly, therefore no default z vector is created. """ # optionally load mesh if isinstance(mesh, str): mesh = pg.Mesh(mesh) assert mesh.dim() == 2, f'need 2D Mesh, got {mesh.dim()}D Mesh' if self.zvec is None: raise Exception('Need z vector. Please use eiterh createZVector or' ' setZVector method.') self.kernelMesh1D = mesh self._evalKernelMesh(mesh, calc_3D_stats=calc_3D_stats)
def _evalKernelMesh(self, mesh, calc_3D_stats=True): """ Internal function to evalualte the kernel mesh cell center and volume. Midpoints of the virtual cells (virtual because there is no actual 3D Mesh) """ # midpoints of the virtual cells (virtual because # there is no actual 3D Mesh) if self.dimension == 1: if calc_3D_stats: zmid = (self._zvec[:-1] + self._zvec[1:]) / 2. zheight = -(self._zvec[1:] - self._zvec[:-1]) cc = np.array(mesh.cellCenters()) self.kernelMeshCellCenter = np.tile(cc, [len(zmid), 1]).T self.kernelMeshCellCenter[2] = \ np.repeat(zmid, mesh.cellCount()) ca = np.array(mesh.cellSizes()) self.kernelMeshCellVolume = \ np.tile(ca, len(zheight)) * \ np.repeat(zheight, mesh.cellCount()) self.interpolationMatrix = None elif self.dimension == 2: if self.yvec is None or len(self.yvec) == 0: raise Exception('Cannot calculate cell centers without y vec.') if calc_3D_stats: # midpoints of the virtual cells (virtual because # there is no actual 3D Mesh) ymid = (self.yvec[:-1] + self.yvec[1:]) / 2 # lengths of the cells ylength = np.abs(self.yvec[1:] - self.yvec[:-1]) kernelCellCenter2D = np.array(mesh.cellCenters()) if np.allclose(kernelCellCenter2D[:, 2], 0): kernelCellCenter2D = np.column_stack( (kernelCellCenter2D[:, 0], kernelCellCenter2D[:, 2], kernelCellCenter2D[:, 1])) self.kernelMeshCellCenter = np.tile(kernelCellCenter2D, [len(ymid), 1]).T self.kernelMeshCellCenter[1] = np.repeat( ymid, mesh.cellCount()) kernelCellAreas2D = np.array(mesh.cellSizes()) self.kernelMeshCellVolume = np.tile( kernelCellAreas2D, len(ylength)) * \ np.repeat(ylength, mesh.cellCount()) self.interpolationMatrix = None else: # not needed for 0D and 3D pass
[docs] def create2DKernelMesh(self, area=15., quality=34, yvec=None, x_factor=5, z_factor=2, savename=None, export_xzplane=None, calc_3D_stats=True, order=0): """ Similary to the mesh in the 1D case a special mesh consisting of triangluar zylinders is generated. The Zylinders are pointing in the y direction to allow a perfect integration to the x-z plane. Parameters ---------- area: float [15.] Affects the maximum area a triangle in the 2D slice is allowed to have. Higher Values lead to bigger cells. quality: float [34] Defines the smallest angle inside a triangle. Be careful with values above 34.5. Higher values = more cells. yvec: ndarray, list [None] Usualy the y vector is defined automatically, this flag gives the user the optional possibility to give a YVec from outside the function. x_factor: float [2] Extension of the kernel mesh (and therefore integration volume) in the x direction. Should be at least 2 times the loop diameter or shortest edge length. This value defines the multipier. z_factor: float [2] Extension of the kernel mesh (and therefore integration volume) in the z direction. Should be at least 2 times the loop diameter or shortest edge length. This value defines the multipier. savename: string [None] If a savename is given, the resulting 2D Mesh is saved in the .bms format for later use. export_xyplane: string [ None ] Filename for the resulting kernel mesh plane in 2D can be exported for debugging or simply to check the mesh (vtk). """ # compute inner limits for kernelmesh self._evalOuterDims() kernelpoly = pg.meshtools.createWorld( (self.minx, float(self.minz)), (self.maxx, float(self.maxz))) log.info('create2DKernelMesh: discretisation inner mesh: {:0.4f} m²' .format(self.min_dis**2)) innermesh = pg.meshtools.createMesh( kernelpoly, quality=quality, area=self.min_dis**2, smooth=[2, 1]) # ax, cbar = pg.viewer.showMesh(innermesh, # innermesh.cellMarkers(), # cMap='summer', # label='region marker') # inner is ready innermesh.setCellMarkers(np.ones(innermesh.cellCount(), dtype=int)) endSizeX = self.max_dis * x_factor endSizeZ = self.max_dis * z_factor log.debug(f'Append boundary: {endSizeX} x {endSizeZ}') log.info(f'create2DKernelMesh: discretisation \ outer mesh: {area:0.2f} m²') kernelmesh = pg.meshtools.appendTriangleBoundary( innermesh, xbound=(endSizeX - self.max_dis) / 2, ybound=endSizeZ, marker=2, area=area, quality=quality, smooth=True, isSubSurface=True, addNodes=5) # print(innermesh) # print(kernelmesh) # ax, cbar = pg.viewer.showMesh(kernelmesh, # kernelmesh.cellMarkers(), # cMap='summer', # label='region marker') if savename is not None: checkDirectory(savename, filename=True) kernelmesh.save(savename + '.bms') print(f'2DKernelmesh saved as {savename}.bms') if export_xzplane is not None: checkDirectory(export_xzplane, filename=True) kernelmesh.exportVTK(export_xzplane + '.vtk') log.info(f'2DKernelmesh exported as {export_xzplane}.vtk') if yvec is None: self.createYVec() else: self.yvec = np.array(yvec) self.set2DKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats, order=order)
def _evalOuterDims(self): """ Internal function. Calculates the outer dims of the mesh. """ self._checkSrc() if self.coincident: pos = self.tx.pos self.min_dis = self.tx._mindistance self.max_dis = self.tx._maxdistance else: pos = np.concatenate((self.tx.pos, self.rx.pos), axis=0) self.min_dis = np.min([self.tx._mindistance, self.rx._mindistance]) self.max_dis = np.min([self.tx._maxdistance, self.rx._maxdistance]) self.minz = np.min([-self.min_dis, -0.5]) self.minx = np.min(pos[:, 0]) + self.minz self.maxx = np.max(pos[:, 0]) - self.minz self.miny = np.min(pos[:, 1]) + self.minz self.maxy = np.max(pos[:, 1]) - self.minz self.maxz = 0. if self.tx.type == 'line' or self.rx.type == 'line': self.minx = np.min([self.minx, self.miny]) self.miny = np.min([self.minx, self.miny]) self.maxx = np.max([self.maxx, self.maxy]) self.maxy = np.max([self.maxx, self.maxy]) if len(pos) == 1: # just one dipole self.minx = -5. + pos[0][0] self.maxx = 5. + pos[0][0] self.miny = -5. + pos[0][1] self.maxy = 5. + pos[0][1] self.minz = -1. + pos[0][2]
[docs] def create1DInterpolationSlices(self): coords = create1DInterpolationSlices(self) return coords
[docs] def create2DInterpolationSlices(self): coords = create2DInterpolationSlices(self) return coords
[docs] def createYVec(self, max_length=0.2, max_num=300, y_factor=2., calc_3D_stats=True): """ Creates the y vector discretization for the 2D kernel mesh. The y vector represents the y values of the 3D Kernel mesh before the integration to 2D. Parameters ---------- max_length: float [ 0.2 ] Maximum distance between two slices inbetween the source dipoles. max_num: integer [ 300 ] Maximum number of slices. Overrides max_length if they conflict. y_factor: float [ 2. ] Extension of the kernel mesh (and therefore integration volume) in the y direction. Should be at least 2 times the loop diameter or shortest edge length. This value defines the multipier. """ # hyperbolic sine self._evalOuterDims() pos = self.tx.pos if not self.coincident: pos = np.concatenate((pos, self.rx.pos), axis=0) midYVec = np.arange(self.miny, self.maxy, max_length) if len(midYVec) > max_num//3: midYVec = np.linspace(self.miny, self.maxy, max_num//3) leftYVec = sinhspace(self.miny, self.miny - self.max_dis * y_factor, len(midYVec))[::-1] rightYVec = sinhspace(self.maxy, self.maxy + self.max_dis * y_factor, len(midYVec)) self.setYVector(np.unique( np.concatenate([leftYVec, midYVec, rightYVec]))) if self.kernelMesh2D is not None and calc_3D_stats: # new y vec triggers new evaluation of kernelmesh self.set2DKernelMesh(self.kernelMesh2D, yvec=None, order=self.h_order, integration_mat=self.h_mat, calc_3D_stats=True)
[docs] def set2DKernelMesh(self, inmesh, yvec=None, order=0, integration_mat=None, calc_3D_stats=True): """ kwargs to createYVec if YVec is None """ log.info('set2DKernelMesh: {!s}, order={}'.format(inmesh, order)) self.kernelMesh2D = pg.Mesh(inmesh) self.h_order = order if self.h_order > 0: # for i in range(self.h_order): if integration_mat is None: # case 1/2: create h refinement and calc int mat kernelmesh, self.h_mat = cm.createH2(self.kernelMesh2D, order=self.h_order, integration_mat=True) else: # case 2/2: set integration mat from outside kernelmesh = cm.createH2(self.kernelMesh2D, order=self.h_order, integration_mat=False) self.h_mat = integration_mat else: kernelmesh = pg.Mesh(self.kernelMesh2D) self.interpolationMatrix = None # for interpolation purposes of magnetic fields self.slice_positions = kernelmesh.cellCenters() # y-vector: 2D -> 3D kernelmesh.setDimension(3) kernelmesh.rotate(pg.RVector3(np.pi/2., 0, 0)) # x, y --> x, z # validation test for rotation # kernelmesh.exportVTK('Kernel2D_XZSlice.vtk') if yvec is not None: self.setYVector(yvec) elif self.yvec is None: self.createYVec() else: # midpoints of the virtual cells (virtual because there is no actual # 3D Mesh) self._evalKernelMesh(kernelmesh, calc_3D_stats=calc_3D_stats)
def _checkInterpolation(self, slices=False): """Internal function to evalate if interpolation can be done. To prevent cpp error messages after very long calculation time. """ if self.dimension not in (1, 2): # shortcut return if slices: if self.dimension == 1: # case 1.1: slice interpolation for 1D if self.kernelMesh1D is None: raise Exception( 'Check Interpolation: No kernel mesh found.') if self.zvec is None: raise Exception( 'Check Interpolation: No z vector found.') kern_minx = self.kernelMesh1D.xmin() kern_maxx = self.kernelMesh1D.xmax() kern_miny = self.kernelMesh1D.ymin() kern_maxy = self.kernelMesh1D.ymax() kern_minz = np.min(self.getZVector(reduced=False)) kern_maxz = np.max(self.getZVector(reduced=False)) elif self.dimension == 2: # case 1.2: slice interpolation for 2D if self.kernelMesh2D is None: raise Exception( 'Check Interpolation: No kernel mesh found.') if self.yvec is None: raise Exception( 'Check Interpolation: No y vector found.') kern_cc = np.array(self.kernelMesh2D.cellCenters()).T kern_minx = np.min(kern_cc[0]) kern_maxx = np.max(kern_cc[0]) mid_y = self.yvec[1:] - self.yvec[:-1] kern_miny = np.min(mid_y) kern_maxy = np.max(mid_y) kern_minz = np.min(kern_cc[2]) # self.kernelMesh2D.zmin() kern_maxz = np.max(kern_cc[2]) # self.kernelMesh2D.zmax() else: if self.dimension == 1: # case 2.1: direct interpolation for 1D if self.kernelMeshCellCenter is None: if self.kernelMesh1D is not None: self._evalKernelMesh() else: raise Exception( 'Check Interpolation: No kernel mesh found.') elif self.dimension == 2: # case 2.2: direct interpolation for 2D if self.kernelMeshCellCenter is None: if self.kernelMesh2D is not None: self._evalKernelMesh() else: raise Exception( 'Check Interpolation: No kernel mesh found.') # case 2: direct interpolation for 1D/2D coords_k = self.kernelMeshCellCenter kern_minx = np.min(coords_k[0]) kern_maxx = np.max(coords_k[0]) kern_miny = np.min(coords_k[1]) kern_maxy = np.max(coords_k[1]) kern_minz = np.min(coords_k[2]) kern_maxz = np.max(coords_k[2]) # all cases: loop discretization coords_l = None if self.seploopmesh is not None: coords_l = np.array(self.seploopmesh.positions()).T elif self.rx is not None and self.rx is not None: if self.tx.loopmesh is not None: coords_l = np.array(self.tx.loopmesh.positions()).T if self.rx.loopmesh is not None: coords_l = \ np.concatenate((np.array( self.tx.loopmesh.positions()), np.array( self.tx.loopmesh.positions())), axis=0).T else: pass if coords_l is None: raise Exception('Check Interpolation: No src mesh found.') k1 = np.array([-kern_minx, kern_maxx, -kern_miny, kern_maxy, -kern_minz, kern_maxz]) l1 = np.array([-np.min(coords_l[0]), np.max(coords_l[0]), -np.min(coords_l[1]), np.max(coords_l[1]), -np.min(coords_l[2]), np.max(coords_l[2])]) out = k1 > l1 raiseit = False if np.any(out): print(np.allclose(k1[out], l1[out])) if not np.allclose(k1[out], l1[out]): raiseit = True if raiseit: raise Exception('Kernelmesh is greater than loopmesh, ' 'interpolation of Bfield will fail. ' 'Change one of the meshes')
[docs] def calcInterpolationMatrix(self): # , num_cpu=8): self._checkInterpolation() # self.interpolationMatrix = \ # calcInterpolationMatrix_para(self.tx.loopmesh, # self.kernelMeshCellCenter.T, # num_cpu=num_cpu) # unparallelized self.interpolationMatrix = \ self.tx.calculateInterpolationMatrix( self.kernelMeshCellCenter.T)
[docs] def BFieldCalculation(self, loop_mesh=None, dipole_mesh=None, interpolate=False, just_loop_fields=False, recalc_loop_fields=False, recalc_primary=False, num_cpu=12, **kwargs): """ Calculates the Bfield for the kernel function for tx and rx. internal call of `loop.calculate()` including decision if cell based or node based Bfield is needed. All optional parameters are piped to the `loop.calculate()` call. Based on the desired dimension of the kernel a specialised mesh may be automatically generated for the calculation. Part 1/3 of the kernel calculation. Called automatically if `kernel.calculate` is called. """ self._checkSrc() if recalc_primary and self.dimension == 2: if self.tx.secondary is None: raise Exception('No secondary field found on tx: {!r}' .format(self.tx)) if self.rx is not None: if self.rx.secondary is None: raise Exception('No secondary field found on rx: {!r}' .format(self.rx)) if isinstance(loop_mesh, str): loop_mesh = pg.Mesh(loop_mesh) # no loomesh given, search in tx (and rx) if loop_mesh is None: loop_mesh = self.tx.loopmesh # ok take tx mesh log.debug(f'BFieldCalculation: found loopmesh: ' f'{self.tx.loop_mesh_name}') if loop_mesh is not None: # if found if not self.coincident: # check for rx too same_mesh = False if self.rx.loopmesh is self.tx.loopmesh: same_mesh = True else: if self.rx.loopmesh.nodeCount() ==\ self.tx.loopmesh.nodeCount(): if np.allclose( self.rx.loopmesh.positions().array(), self.tx.loopmesh.positions().array()): same_mesh = True log.debug( f'BFieldCalculation: compare to rx mesh: {same_mesh}') if not same_mesh: # same ? loop_mesh = None # no = new mesh will be generated log.debug('BFieldCalculation: loop_mesh: {}'.format(loop_mesh)) cell_center = self.dimension in (0, 3) str_ = 'cell centers.' if cell_center else 'nodes.' # override transmitter loop config with larmor frequency of M field. self.tx.config.f = self.larmor # case 1: seperated loop if not self.coincident: self.rx.config.f = self.larmor # case 1.1: calc new if loop_mesh is None or (dipole_mesh is None and interpolate): if self.seploopmesh is None: self.createSeperatedLoopMesh(dipole=interpolate) else: # case 1.2: use given meshs (for example for roll along) self.seploopmesh = loop_mesh if interpolate: self.seploopmesh_dipole = dipole_mesh if self.seploopmesh is not self.tx.loopmesh: log.debug(f'BFieldCalculation: set new mesh to tx: ' f'{self.slm_name}') self.tx.setLoopMesh(self.seploopmesh, savename=self.slm_name) if self.seploopmesh is not self.rx.loopmesh: log.debug(f'BFieldCalculation: set new mesh to rx: ' f'{self.slm_name}') self.rx.setLoopMesh(self.seploopmesh, savename=self.slm_name) if self.seploopmesh_dipole is not None: if self.seploopmesh_dipole is not self.tx.dipolemesh: log.debug(f'BFieldCalculation: set newdipole mesh to tx: ' f'{self.slm_dipole_name}') self.tx.setDipoleMesh(self.seploopmesh_dipole, savename=self.slm_dipole_name) if self.seploopmesh_dipole is not self.rx.dipolemesh: log.debug(f'BFieldCalculation: set new dipole mesh to rx: ' f'{self.slm_dipole_name}') self.rx.setDipoleMesh(self.seploopmesh_dipole, savename=self.slm_dipole_name) if self.tx.field is None or recalc_loop_fields: log.info('Calculating magnetic field of tx for {}D-kernel on ' '{} ...'.format(self.dimension, str_)) # shared loops doesnt need to be calced again self.tx.calculate(loop_mesh=self.seploopmesh, dipole_mesh=self.seploopmesh_dipole, interpolate=interpolate, cell_center=cell_center, num_cpu=num_cpu, **kwargs) if self.rx.field is None or recalc_loop_fields: log.info('Calculating magnetic field of rx for {}D-kernel on ' '{} ...'.format(self.dimension, str_)) self.rx.calculate(loop_mesh=self.seploopmesh, dipole_mesh=self.seploopmesh_dipole, interpolate=interpolate, cell_center=cell_center, num_cpu=num_cpu, **kwargs) # case 2: coincident else: if loop_mesh is not None: if loop_mesh is not self.tx.loopmesh: log.debug(f'BFieldCalculation: set new mesh to tx: ' f'{self.slm_name}') self.tx.setLoopMesh(loop_mesh, savename=self.slm_name) if dipole_mesh is not None: if dipole_mesh is not self.tx.dipolemesh: log.debug(f'BFieldCalculation: set new dipole mesh to ' f'tx: {self.slm_dipole_name}') self.tx.setDipoleMesh(dipole_mesh, savename=self.slm_dipole_name) if self.tx.field is None or recalc_loop_fields: log.info('Calculating magnetic field of tx for {}D-kernel on ' '{} ...'.format(self.dimension, str_)) self.tx.calculate(loop_mesh=loop_mesh, dipole_mesh=dipole_mesh, interpolate=interpolate, cell_center=cell_center, num_cpu=num_cpu, **kwargs) # tx.field -> self.txField (interpolation or directly) if just_loop_fields is False: self.interpolateBFieldToKernel(recalc_primary=recalc_primary, num_cpu=num_cpu)
[docs] def interpolateBFieldToKernel(self, recalc_prim_on_kernel=False, recalc_primary=False, num_cpu=32, calc_3D_stats=True): ''' Takes the rx Bfield and interpolates it to the kernel mesh. ''' if recalc_prim_on_kernel: raise Exception('to be integrated') if self.dimension in (1, 2): if self.kernelMeshCellCenter is None: if self.dimension == 1: if self.kernelMesh1D is None: log.log(16, 'Creating new kernel mesh.') self.create1DKernelMesh(calc_3D_stats=calc_3D_stats) else: if calc_3D_stats: self._evalKernelMesh(self.kernelMesh1D) elif self.dimension == 2: if self.kernelMesh2D is None: log.log(16, 'Creating new kernel mesh.') self.create2DKernelMesh(calc_3D_stats=calc_3D_stats) else: if calc_3D_stats: self._evalKernelMesh(self.kernelMesh2D) ticklI = time.time() log.log(16, 'Start interpolation of Bfield for ' '{}D-kernel calculation ({} points)...' .format(self.dimension, len(self.kernelMeshCellVolume))) if self.interpolationMatrix is None: self.calcInterpolationMatrix() # num_cpu=num_cpu) if recalc_primary: tx_mesh = self.tx.loopmesh tx_mesh_name = self.tx.loop_mesh_name tx_total = np.copy(self.tx.field) self.txBfield = cm.interpolateField_Matrix( self.tx.secondary, self.interpolationMatrix) log.log(13, 'calculating primary field on kernel mesh ' 'directly.') self.tx.setLoopMesh(self.kernelMeshCellCenter.T) self.tx.config.ftype = 'B' self.tx.calculate(num_cpu=num_cpu) self.tx_primary = self.tx.field self.txBfield += self.tx_primary self.tx.field = np.copy(tx_total) # reset old tx field self.tx.setLoopMesh(tx_mesh, savename=tx_mesh_name) if not self.coincident: rx_mesh = self.rx.loopmesh rx_mesh_name = self.rx.loop_mesh_name rx_total = np.copy(self.rx.field) self.rxBfield = cm.interpolateField_Matrix( self.rx.secondary, self.interpolationMatrix) log.log(13, 'calculating primary field on ' 'kernel mesh directly.') self.rx.setLoopMesh(self.kernelMeshCellCenter.T) self.rx.config.ftype = 'B' self.rx.calculate(num_cpu=num_cpu) self.rx_primary = np.copy(self.rx.field) self.rxBfield += self.rx_primary self.rx.field = rx_total self.rx.setLoopMesh(rx_mesh, savename=rx_mesh_name) else: mat_rows = self.interpolationMatrix.rows() mat_cols = self.interpolationMatrix.cols() if mat_cols != self.tx.field.shape[1]: raise Exception( 'interpolateBFieldToKernel(): Dimension mismatch ' 'between interpolation matrix ({} x {}) and tx field ' 'shape ({})' .format(mat_cols, mat_rows, self.tx.field.shape)) if mat_rows != len(self.kernelMeshCellVolume): raise Exception( 'interpolateBFieldToKernel(): Dimension mismatch ' 'between interpolation matrix ({} x {}) and kernel ' 'mesh cell count in 3D ({})' .format(mat_cols, mat_rows, len(self.kernelMeshCellVolume))) self.txBfield = cm.interpolateField_Matrix( self.tx.field, self.interpolationMatrix) if not self.coincident: if mat_cols != self.rx.field.shape[1]: raise Exception( 'interpolateBFieldToKernel(): Dimension mismatch ' 'between interpolation matrix ({} x {}) and rx ' 'field shape ({})' .format(mat_cols, mat_rows, self.rx.field.shape)) self.rxBfield = cm.interpolateField_Matrix( self.rx.field, self.interpolationMatrix) log.log(16, 'B field interpolation in {:.2f} sec' .format(time.time() - ticklI)) else: self.txBfield = self.tx.field if not self.coincident: self.rxBfield = self.rx.field
[docs] def ellipticalDecomposition(self): """ Computes the counter and corotating parts of the given magnetic fields with respect to a given earth magnetic field. Parameters ---------- Bfield: complex field [3, n] or string Optional. Possibility to insert a pre calculated field. Inclination: float Inclination of the earth magnetic field at the loop site in rad [0... 2pi] Declination: float Declination of the magnetic field at the loop site in rad [0... 2pi] B: np.array of shape (3, n) Magnetic field of the loop Second part of the kernel calculation. - mainly from Weichman et al. (2000) """ if self.txBfield is None: log.info( 'Elliptical decomposition triggered new B-field calculation.') self.BFieldCalculation() self.txalpha, self.txbeta, self.txzeta, self.txperpend = \ self.ellipticalDecomposition_multi(self.txBfield, self.survey.earth) if not self.coincident: self.rxalpha, self.rxbeta, self.rxzeta, self.rxperpend = \ self.ellipticalDecomposition_multi(self.rxBfield, self.survey.earth) return (self.txalpha, self.txbeta, self.txzeta),\ (self.rxalpha, self.rxbeta, self.rxzeta)
[docs] @staticmethod def ellipticalDecomposition_multi(Bfield, earth): """ Computes the counter and corotating parts of the given magnetic fields with respect to a given earth magnetic field. Parameters ---------- Bfield: complex field [3, n] or string Optional. Possibility to insert a pre calculated field. Inclination: float Inclination of the earth magnetic field at the loop site in rad [0... 2pi] Declination: float Declination of the magnetic field at the loop site in rad [0... 2pi] B: np.array of shape (3, n) Magnetic field of the loop Second part of the kernel calculation. Literature ---------- - Weichman et al. (2000) - Hertrich (2005, Appendix) - Hertrich (2008, eq. 6 ff.) """ tickeD = time.time() # Nov 2020: Finally figuring out where the conjugated kernel origins # this is the final fix to ensure the phase of the kernel is finally # consistent with pulications of Hertrich, Braun and Jiang. # Explaination: The negative values of the larmor frequency # (representing the orientation of the spins) manifests as a 180 degree # phase in the magneitc fields for both transmitter and receiver. # This has not been implemented in the magnetic field calcualtion, # which means it is now condidered right here. log.debug('Using conjugate complex of input field due to larmor sign.') bfield = np.conjugate(Bfield) # earth magnetic field: B0 # z negativ downwards (by convention) ! # Norm! (b0 in Hertrich 2008) B0 = earth.field # Bperp: perpendicular projection of B = Bfield vertical on B0 # vertical components from B pertaining to B0 # Hertrich, 2008 (eq. 4) scalarproduct = np.sum(B0 * bfield, 0) Bperp = bfield - scalarproduct * B0 crossproduct = np.cross(Bperp, np.conj(Bperp), axis=0) B0_cross = B0 * crossproduct tmp = 1j * np.sum(B0_cross, 0) # 09.02.2019 switched sign from - to + signum = np.sign(tmp).real # 11.03.2020 supervised by T. Hiller, same as above! # signum = np.zeros_like(tmp, dtype=float) # signum[np.logical_or(tmp.real > 0, # np.logical_and(np.isclose(tmp.real, 0), # tmp.imag > 0))] = 1 # signum[np.logical_or(tmp.real < 0, # np.logical_and(np.isclose(tmp.real, 0), # tmp.imag < 0))] = -1 # alpha, beta, zeta # always real B_v_conj_B_v = np.sum(Bperp * np.conj(Bperp), 0).real B_vert_B_vert = np.sum(Bperp * Bperp, 0) # calc alpha and beta # weichman et al. 2000: eq 4.5 alpha = np.sqrt(0.5 * (B_v_conj_B_v + np.abs(B_vert_B_vert))) beta_hlp = B_v_conj_B_v - np.abs(B_vert_B_vert) log.debug('negative beta inbound') log.debug('B_v_conj_B_v, {}'.format(B_v_conj_B_v[beta_hlp < 0])) log.debug('np.abs(B_vert_B_vert), {}'.format( np.abs(B_vert_B_vert)[beta_hlp < 0])) log.debug(beta_hlp[beta_hlp < 0]) beta = np.real(np.lib.scimath.sqrt(beta_hlp * 0.5) * signum) # alpha and beta are both real, # however the sqrt in beta can recieve neg values, in which case the # result gets imaginary components. They should be near zero and can be # ommitted. Also zeta in the same time should be near (1 + 0j), further # reducing the weight of imaginary parts in beta # calc exp(j*zeta_trans (== phase)) # weichman et al. 2000: eq 4.4 zeta = np.sqrt(B_vert_B_vert / np.abs(B_vert_B_vert)) # calc norm (was called perpendicular before 06.02.19) # weichman et al. 2000: eq 4.6 # unit vector b_norm = np.real(Bperp / zeta) / alpha # log.debug('bnorm') # bnorm_mag = np.sqrt(np.sum(b_norm * np.conj(b_norm), axis=0)) # log.debug((np.min(bnorm_mag), np.max(bnorm_mag))) log.debug('elliptical decomposition: {:.2f} sec' .format(time.time() - tickeD)) return alpha, beta, zeta, b_norm
[docs] def kernelIntegration(self, calc_theta=False): """ Computes the integration of the kernel with respect to the desired dimension. Parameters ---------- decomposition: (alpha, beta, zeta) Bfield_part essentially consists of the output from the elliptical decomposition of the magnetic field. measurement: class An instance of a measurement class has to be given in order to keep the number of input arguments manageable. earthmagnitude: float Magnitude of the earth magnetic field [Tesla]. Aproximatly about 30000 to 65000 nT (1 nT = 1e-9 Tesla). Third part of the kernel calculation. """ self._checkPulses() tickkI = time.time() if self.txalpha is None: # alpha, beta, zeta and rest of the formulations # from Weichman et al. (2000) # from Hertrich (2008, eq. 6-10) self.ellipticalDecomposition() if self.coincident: self.rxalpha = None self.rxbeta = None self.rxzeta = None self.rxperpend = None # note: this function needs more memory than cpu ret = self.kernelCalculation_multi( self.fid, self.survey.earth, self.txalpha, self.txbeta, self.txzeta, self.txperpend, rxalpha=self.rxalpha, rxbeta=self.rxbeta, rxzeta=self.rxzeta, rxperpend=self.rxperpend, calc_theta=calc_theta) if calc_theta: K1, K2, K3, self.theta = ret else: K1, K2, K3 = ret self.K = K1 * K2 * K3 np.save('KernMat_dim{}.npy'.format(self.dimension), self.K) if self.debug: if self.dimension != 2: pass else: self.K_part1 = K1 self.K_part2 = K2 self.K_part3 = K3 kp12D = np.sum(np.array( K1 * self.kernelMeshCellVolume).reshape( len(self.pulses), len(self.yvec) - 1, -1), 1) kp22D = np.sum(np.array( K2 * self.kernelMeshCellVolume).reshape( 1, len(self.yvec) - 1, -1), 1) kp32D = np.sum(np.array( K3 * self.kernelMeshCellVolume).reshape( 1, len(self.yvec) - 1, -1), 1) if self.h_order >= 1: kp12D = integrateKernelH2(self.h_mat, kp12D) kp22D = integrateKernelH2(self.h_mat, kp22D) kp32D = integrateKernelH2(self.h_mat, kp32D) self.K_part1 = kp12D /\ np.array(self.kernelMesh2D.cellSizes()) self.K_part2 = kp22D /\ np.array(self.kernelMesh2D.cellSizes()) self.K_part3 = kp32D /\ np.array(self.kernelMesh2D.cellSizes()) nk1 = 'kern_part1_3D.npy' np.save(nk1, self.K_part1) print('Kernel: DEBUG: save {}'.format(nk1)) nk2 = 'kern_part2_3D.npy' np.save(nk2, self.K_part2) print('Kernel: DEBUG: save {}'.format(nk2)) nk3 = 'kern_part3_3D.npy' np.save(nk3, self.K_part3) print('Kernel: DEBUG: save {}'.format(nk3)) nk4 = 'kern_full_3D.npy' if self.h_order >= 1: kern3D = np.ones((len(self.pulses), len(self.yvec) - 1, self.h_mat.rows()), dtype=np.complex) raw = self.K.reshape( len(self.pulses), len(self.yvec) - 1, -1) for ip, pulse in enumerate(raw): # pulses for ik, kslice in enumerate(pulse): kern3D[ip, ik] = integrateKernelH2(self.h_mat, kslice) kern3D = kern3D.reshape(len(self.pulses), -1) else: kern3D = self.K np.save(nk4, kern3D) print('Kernel: DEBUG: save {}'.format(nk4)) # Integration with respect to the desired dimension if self.dimension == 0: # direct sum over the volume + weighting self.K = np.sum( self.K * np.array(self.tx.loopmesh.cellSizes()), 1) if self.dimension == 1: # 1D Kernel over a given z-vector # integration over x-y-plane to z <- 1D # Dimension before sum: (len(pm_vec), Z, X*Z), X*Z = dim 2 self.K = np.sum(np.array( self.K * self.kernelMeshCellVolume).reshape( len(self.pulses), len(self._zvec) - 1, -1), 2) if self.dimension == 2: # 2D Kernel over a given y-vector # integration over y to x-z-plane <- 2D # Dimension before sum: (len(pm_vec), Y, X*Z), y = dim 1 self.K = np.sum(np.array( self.K * self.kernelMeshCellVolume).reshape( len(self.pulses), len(self.yvec) - 1, -1), 1) # self.K = K_full if self.h_order > 0: self.K = integrateKernelH2(self.h_mat, self.K) # Attention: self.K *= 4**self.h_order # nico: due to the weighting with self.kernelMeshCellVolume # of the refined mesh, the weigthting functions are too small. # The actual cell sizes of the unrefined 2D mesh are bigger # by a factor of 4**order. # Or in other words: you cannot integrate twice with weighting. # The second integration doesnt need another weighting. if self.dimension == 3: # just adding weight (Volume), no integration self.K *= np.array(self.tx.loopmesh.cellSizes()) log.info('kernel integration {:.2f} sec'.format(time.time() - tickkI))
[docs] @staticmethod def kernelCalculation_multi( fid, earth, txalpha, txbeta, txzeta, txperpend, rxalpha=None, rxbeta=None, rxzeta=None, rxperpend=None, calc_theta=False): """ """ def scalar(vec1, vec2): """ Scalar product of two (3, n)-vectors -> (n,)-vector. """ return np.sum(vec1 * vec2, 0) def cross(vec1, vec2): """ Cross product of two (3, n)-vectors -> (3, n)-vector. """ return np.cross(vec1, vec2, axis=0) # common stuff pulsevector = fid.pulses[:, np.newaxis] * \ fid.tx_turns gam = misc.constants.gamma B0 = earth.field # magnetization if np.allclose(fid.df, 0): log.debug('Kernel calculation without frequency offset') # magnetization: no frequeny offset theta = 0.5 * gam * pulsevector * (txalpha - txbeta) magnetization = np.sin(theta) # different coordinate system # rotated reference frame # theta = flipwinkel = 0.5 * gam * pulsevector * # (txalpha - txbeta) # task: drehe B0 vector um txperp um den winkel theta # das is dann der winkel in den die magnetisierung zeigt # (im carthesischen coord system, nicht mehr für kernel z # gebrauchen) else: log.debug('Kernel calculation with df: {}'.format( np.array(fid.df))) # magnetization with frequency offset theta = np.arctan2( 0.5 * gam * pulsevector / fid.taup1 * (txalpha - txbeta), (2 * np.pi * fid.df)[:, np.newaxis]) flip_eff = np.sqrt( (0.5 * gam * pulsevector * (txalpha - txbeta))**2 + (2 * np.pi * fid.df[:, np.newaxis] * fid.taup1)**2) magnetization = np.sin(flip_eff) * np.sin(theta)\ - 1j * np.sin(theta) * np.cos(theta) * (np.cos(flip_eff) - 1) # transmitter part # changed tx_turns -> rx_turns (12.03.19) Aaron said yes K_part1 =\ txzeta * magnetization * fid.curie * gam * \ (earth.magnitude**2) * fid.rx_turns # receiver part + geometry if rxalpha is None: # coincident, normally rx, in this case rx=tx # not txzeta**2, txzeta is also in k_part1 K_part2 = txzeta * (txalpha + txbeta) # K_part3: coincident: geometric factor = 1 K_part3 = np.ones_like(K_part2) else: # separated case # not rxzeta * txzeta, txzeta is in k_part1 K_part2 = rxzeta * (rxalpha + rxbeta) # K_part3: geometric factor K_part3 = scalar(rxperpend, txperpend) + \ 1j * scalar(B0, cross(rxperpend, txperpend)) ret = (K_part1, K_part2, K_part3,) if calc_theta: ret += (theta,) return ret
[docs] def sliceKernel2D(self, savename=None, forceNew=False, loopSaveName=None, num_cpu=None, new_bfield=False, loop_mesh=None, slice_name=None, **kwargs): """ 2D Kernel in a memory saving parallel computation approach. """ import multiprocessing as mpc import tempfile as temp # see if tx and rx is available self._checkSrc() if self.kernelMesh2D is None: self.create2DKernelMesh(calc_3D_stats=False) tick_sI = time.time() if num_cpu is None: num_cpu = self.num_cpu N = len(self.yvec) - 1 number_of_processes = min(min(mpc.cpu_count(), N), num_cpu) if slice_name is None: # Preparation of interpoaltion mesh and loops self.BFieldCalculation(just_loop_fields=True, loop_mesh=loop_mesh, recalc_loop_fields=new_bfield, num_cpu=num_cpu) if self.coincident: rxfield = None mesh = self.tx.loopmesh else: rxfield = self.rx.field mesh = self.seploopmesh if os.path.exists(self.tx.loop_mesh_name): temporary_mesh = False meshName = self.tx.loop_mesh_name log.debug('loop mesh for multiprocessing: {}'.format(meshName)) else: # create temporary filename temporary_mesh = True tempFile = temp.NamedTemporaryFile(suffix='.bms', delete=False) meshName = tempFile.name tempFile.close() # pygimli needs write access, hence delete=False log.debug('save mesh for multiprocessing: {}'.format(meshName)) log.debug('loopmeshname: {}'.format(self.tx.loop_mesh_name)) log.debug('exists: {}'.format( os.path.exists(self.tx.loop_mesh_name))) mesh.save(meshName) self._checkInterpolation(slices=True) else: # Check if slices are available forcheck = Path(slice_name.format(self.fid.tx_index, 0)) if not forcheck.exists(): raise Exception('No slice found under :"{}"' .format(forcheck)) # no source mesh or fields are needed meshName = None rxfield = None temporary_mesh = False self.kernelMeshCellVolume = None self.kernelMeshCellCenter = None slice_queue = mpc.JoinableQueue() # output for worker, input for summation sum_slice_queue = mpc.Queue() # output for summation stop_queue = mpc.JoinableQueue() # end signal for summation parent_id = os.getpid() log.info('Get {} slices... start interpolating in {} parallel ' 'processes.'.format(N, number_of_processes)) processes = [] if self.h_order >= 0: kernelMesh2D_rotated = cm.createH2(self.kernelMesh2D, order=self.h_order, integration_mat=False) else: kernelMesh2D_rotated = pg.Mesh(self.kernelMesh2D) kernelMesh2D_rotated.setDimension(3) kernelMesh2D_rotated.rotate(pg.RVector3(np.pi/2., 0, 0)) out_pos_2d = np.array(kernelMesh2D_rotated.cellCenters()) indices = np.array_split(range(N), number_of_processes) # begin different distribution, to ensure equal workload per processor # saves about 3-4 % calculation time - yay... log.debug('test different array distribution') slices_per_core = len(indices[0]) indices2 = np.zeros(N, dtype=int) complete = 0 for i in range(slices_per_core): temp = indices2[i::slices_per_core] indices2[i::slices_per_core] =\ np.arange(complete, complete + len(temp), dtype=int) complete += len(temp) assert np.allclose(np.sort(indices2), np.array(range(N))) # some checks just prove to be worth the time indices = np.array_split(indices2, number_of_processes) # print(indices) # end different distribution for i in range(number_of_processes): log.debug('index: {}'.format(i)) worker = mpc.Process(target=_2DSliceKernelWorker, args=(meshName, out_pos_2d, indices[i], self.yvec, self.tx.field, rxfield, slice_queue, parent_id, i, # worker ID self.survey.earth, self.fid, slice_name, )) log.debug('created worker') worker.daemon = True processes.append(worker) worker.start() log.debug('started worker: {}'.format(worker.is_alive())) for p in processes: log.debug('still alive: {}'.format(p.is_alive())) # lengths of the cells = weighting (in y direction) sum_worker = mpc.Process( target=_2DSliceSummationWorker, args=(slice_queue, sum_slice_queue, stop_queue, (len(self.pulses), self.kernelMesh2D.cellCount()*(4**self.h_order)), len(self.yvec) - 1, number_of_processes, parent_id)) sum_worker.daemon = True sum_worker.start() for p in processes: log.debug('still alive: {}'.format(p.is_alive())) log.debug('started additional worker for summation: {}' .format(sum_worker.is_alive())) isInterrupt = False log.debug('sum worker still alive: {}'.format(sum_worker.is_alive())) while True: try: if stop_queue.empty(): if not sum_worker.is_alive(): isInterrupt = True if isInterrupt: log.error( 'Sum Worker died, shutting ' 'down the rest as well and abort run.') sum_worker.terminate() break time.sleep(0.5) else: # waits till all slices are summarized, queue empty stop_queue.get() stop_queue.task_done() log.debug('join slice queue') slice_queue.join() log.debug('finished, end of multiprocessing, cleaning up') break except KeyboardInterrupt: isInterrupt = True log.critical( 'detect Keyboard Interrupt. Shutting processes down.') sum_worker.terminate() break for p in processes: p.terminate() # tells the process to terminate time.sleep(0.1) if p.is_alive(): p.join() # waits till termination of p if temporary_mesh: os.unlink(meshName) # release temporary mesh file self.K = sum_slice_queue.get() # final calculated kernel, normalized if self.h_order > 0: self.K = integrateKernelH2(self.h_mat, self.K) log.debug('final Kern: {}'.format(self.K.shape)) sum_worker.terminate() # tells sumWorker to terminate if sum_worker.is_alive(): log.debug('sum_worker not terminated, join()') sum_worker.join() # waits till termination of p # weighting over cell sizes of the actual 2D Mesh (y vec already done) self.K *= np.array(self.kernelMesh2D.cellSizes()) log.info('overall interpolation time: %.4f seconds' % (time.time() - tick_sI)) if temporary_mesh: log.debug('temporary file deleted?: {} (supposed to be True)' .format(not os.path.exists(meshName))) return self.K
[docs] def save(self, savename=None, save_interpolation_mat=False, save_loopmesh=False, light=True, kernelmesh_name=None): """ Save the basic information to restore the Kernel class later. """ if savename is None: savename = Path(f'_default_Kernel_{self.dimension}D') else: savename = Path(savename).with_suffix('') savename.parent.mkdir(exist_ok=True, parents=True) log.info('Save Kernel: {}'.format(savename)) # flip some switches if light is True: # no matrices or meshs and no b fields, just basic NMR stuff # no recalculation possible after load ! save_interpolation_mat = False save_loopmesh = False rx_bfield = None tx_bfield = None else: rx_bfield = self.rxBfield tx_bfield = self.txBfield # separated loop mesh if save_loopmesh and self.seploopmesh is not None: # meshsave managed by kernel class, the loops dont know this if self.slm_name is None: self.slm_name = '{}_SepLoopMesh.bms'.format(savename.stem) log.info(f'save separated loop mesh: {self.slm_name}') self.seploopmesh.save(savename.with_name(self.slm_name).as_posix()) # kernelmesh 1D if self.kernelMesh1D is not None: if kernelmesh_name is not None: kernelmesh1d = kernelmesh_name else: kernelmesh1d = '{}_kern1D.bms'.format(savename.stem) log.info(f'save kernelmesh1D: {kernelmesh1d}') self.kernelMesh1D.save(savename.with_name(kernelmesh1d).as_posix()) else: kernelmesh1d = None # kernelmesh 2D if self.kernelMesh2D is not None: if kernelmesh_name is not None: kernelmesh2d = Path( kernelmesh_name).with_suffix('.bms').as_posix() else: kernelmesh2d = '{}_kern2D.bms'.format(savename.stem) log.info(f'save kernelmesh2D: {kernelmesh2d}') self.kernelMesh2D.save(savename.with_name(kernelmesh2d).as_posix()) else: kernelmesh2d = None # save interpolation matrix if save_interpolation_mat and self.interpolationMatrix is not None: iM = True # True is enough, name is fix iMName = '{}_interpolationMatrix.imat'.format(savename.stem) log.info(f'save interpolation matrix: {iMName}') self.interpolationMatrix.save( savename.with_name(iMName).as_posix()) else: iM = False if self.dimension in (0, 3): # basic np.savez(savename.with_suffix('.npz').as_posix(), kernel=self.K, dimension=self.dimension, fid_index=self.fid_index, sepm_name=self.slm_name, txBfield=tx_bfield, rxBfield=rx_bfield, pulses=self.pulses,) if self.dimension == 3: self.exportVTK(f'{savename.stem}_3DMesh.vtk') elif self.dimension == 1: # + zvec, min_thk, kernelmesh1d, interpolation matrix np.savez(savename.with_suffix('.npz').as_posix(), kernel=self.K, dimension=self.dimension, zvec=self._zvec, fid_index=self.fid_index, min_thk=np.array((self.min_thk),), sepm_name=self.slm_name, kernelmesh1d=kernelmesh1d, txBfield=tx_bfield, rxBfield=rx_bfield, interpolationMatrix=iM, pulses=self.pulses,) elif self.dimension == 2: # + yvec, order, interpolation matrix np.savez(savename.with_suffix('.npz').as_posix(), kernel=self.K, dimension=self.dimension, fid_index=self.fid_index, sepm_name=self.slm_name, kernelmesh2d=kernelmesh2d, yvec=self.yvec, order=self.h_order, txBfield=tx_bfield, rxBfield=rx_bfield, interpolationMatrix=iM, pulses=self.pulses, kern_part1=self.K_part1, kern_part2=self.K_part2, kern_part3=self.K_part3,) else: raise Exception(f'Dim = {self.dimension} ?')
[docs] def load(self, savename, load_loopmesh=True, kernelmesh2d=None, load_kernelmesh=True, use_order_refinement=True): """ Load a previously saved kernel (.npz-format). """ savename = Path(savename).with_suffix('') # found file ? if not checkForKernel(savename.with_suffix('.npz').as_posix()): raise FileNotFoundError( 'cannot found kernel at given path: "{}"' .format(os.path.abspath(savename.with_suffix('.npz')))) log.info('load file: {}'.format( savename.with_suffix('.npz').absolute())) # open main container with np.load(savename.with_suffix('.npz').as_posix(), allow_pickle=True) as content: self.K = getItem(content, 'kernel') self.fid_index = getItem(content, 'fid_index') self.txBfield = getItem(content, 'txBfield') self.rxBfield = getItem(content, 'rxBfield') self.dimension = getItem(content, 'dimension') self.slm_name = getItem(content, 'sepm_name') # temporary uses saved pulses until a survey class is provided # (pulses are then used from the corresponding fid class) self.setPulsesDirectly(getItem(content, 'pulses', None)) if self.dimension == 1: min_thk = getItem(content, 'min_thk', 0.5) zvec = getItem(content, 'zvec') self.setZVector(zvec, min_thk=min_thk) kernelmesh1d = getItem(content, 'kernelmesh1d', None) if kernelmesh1d is not None: name1 = savename.with_name(kernelmesh1d).as_posix() if os.path.exists(name1): self.set1DKernelMesh(name1) else: log.warning(f'Mesh not found: {name1}') # mesh not found, continue elif self.dimension == 2: self.h_order = getItem(content, 'order', 0) self.yvec = getItem(content, 'yvec') if load_kernelmesh: if kernelmesh2d is None: # not given, search in file kernelmesh2d = getItem(content, 'kernelmesh2d', None) else: kernelmesh2d = None if kernelmesh2d is not None: # either given or found in file if not use_order_refinement: self.h_order = 0 if isinstance(kernelmesh2d, pg.Mesh): self.set2DKernelMesh(kernelmesh2d, order=self.h_order) else: name2 = savename.with_name(kernelmesh2d).as_posix() if os.path.exists(name2): self.set2DKernelMesh(name2, order=self.h_order) else: log.warning(f'Mesh not found: {name2}') self.K_part1 = getItem(content, 'kern_part1', None) self.K_part2 = getItem(content, 'kern_part2', None) self.K_part3 = getItem(content, 'kern_part3', None) if self.K_part1 is not None: self.debug = True if load_loopmesh: if self.slm_name is not None: name3 = savename.with_name(self.slm_name).as_posix() if os.path.exists(name3): self.seploopmesh = pg.Mesh(name3) else: log.warning(f'Mesh not found: {name3}') if getItem(content, 'interpolationMatrix', False) is True: self.interpolationMatrix = pg.SparseMapMatrix() hlp = '{}_interpolationMatrix.imat'.format(savename.stem) name4 = savename.with_name(hlp).as_posix() try: self.interpolationMatrix.load(name4) except RuntimeError: log.warning('Interpolation Matrix not found: ' f'{name4}_interpolationMatrix.imat')
[docs] def calculate(self, loop_mesh=None, dipole_mesh=None, interpolate=False, savename=None, forceNew=False, slices=True, slice_name=None, **kwargs): """ All three parts of the kernel calulation are called here. All given kwargs are directed to BfieldCalculation(), see function info for details about possible keyword arguments. called functions ---------------- >>> self.BFieldCalculation(**kwargs) >>> self.ellipticalDecomposition() >>> self.kernelIntegration() >>> if savename is not None: self.save(savename) keyword arguments ----------------- destinations: none for now, with exception of "num_cpu", [12] which is directed to BfieldCalculation and/or sliceKernel """ if not self.updatable: raise Exception( f'Cannot calculate Kernel. Check tx: {self.tx}, rx: {self.rx} ' f'or pulses: {self.pulses}') self.num_cpu = kwargs.pop('num_cpu', self.num_cpu) if slices is True and self.dimension == 2: self.sliceKernel2D(slice_name=slice_name) elif slices is True and self.dimension == 1: self.sliceKernel1D(loop_mesh=loop_mesh, new_bfield=forceNew, slice_name=slice_name) else: self.BFieldCalculation( loop_mesh=loop_mesh, dipole_mesh=dipole_mesh, interpolate=interpolate, recalc_loop_fields=forceNew, num_cpu=self.num_cpu, **kwargs) self.ellipticalDecomposition() self.kernelIntegration(calc_theta=True) if savename is not None: self.save(savename) return
[docs] def createMagnetizationMesh(self): """ Creates full 3D mesh for display and calcualtion of magnetization vectors. Not needed for normal kernel calculation routine and big, therefore separate. """ # 2D mesh -> 3D triangle-prism-mesh log.info('getMagnetization: prepare 3D mesh...') self.mag_mesh = pg.meshtools.extrudeMesh(self.kernelMesh2D, self.yvec) # inplace function working in mesh cpp background, no return value # x-z + y = x-z-y input mesh to x-y-z output mesh self.mag_mesh.swapCoordinates(1, 2)
[docs] def calcMagnetization(self): """ Creates 3D mesh and calcualtes magnetization vector after excitation. Returns magnetization vector of shape (num_pulses, num_cells_3d, 3) """ assert self.dimension == 2, 'getMagnetization: Only working for 2D' if self.theta is None: if self.txalpha is None: self.ellipticalDecomposition() pulsevector = self.fid.pulses[:, np.newaxis] * \ self.fid.tx_turns gam = misc.constants.gamma if np.allclose(self.fid.df, 0): self.theta = 0.5 * gam * pulsevector *\ (self.txalpha - self.txbeta) else: self.theta = np.arctan2( 0.5 * gam * pulsevector / self.fid.taup1 * (self.txalpha - self.txbeta), (2 * np.pi * self.fid.df)[:, np.newaxis]) self.createMagnetizationMesh() vvec = np.tile(self.survey.earth.field, (self.theta.shape[1], 1, 1)) self.magnetization = np.zeros((len(self.pulses), self.theta.shape[1], 3)) for pi in range(len(self.pulses)): tick = time.time() rotmats = cm.rotationMatrix(self.txperpend, self.theta[pi, :]) m2 = np.squeeze(np.einsum('ijk,ikl->ijl', rotmats.T.swapaxes(1, 2), vvec)) magnetization = m2 * self.magnetization_magnitude self.magnetization[pi] = magnetization log.log(13, 'time for one pulse: {:.4f} sec' .format(time.time() - tick)) return self.magnetization
[docs] def exportMagnetization(self, name, vtk_export=False, pulse=0): """ Export a previously calculated magnetization vector as numpy vector and optionally vtk file. """ # has vector? if self.magnetization is None: raise Exception('No magnetization vector found. Use ' 'getMagnetization to calculate the vector or ' 'import and set kern.magnetization manually.') # has mesh? if self.mag_mesh is None: self.createMagnetizationMesh() # export numpy matrix np.save(Path(name).with_suffix('.npy'), self.magnetization) # export vtk if vtk_export: vtk_name = Path(name).with_suffix('.vtk') # # magnetization is always real # xr = pg.meshtools.cellDataToNodeData( # self.mag_mesh, # self.magnetization[pulse, :, 0].real) # yr = pg.meshtools.cellDataToNodeData( # self.mag_mesh, # self.magnetization[pulse, :, 1].real) # zr = pg.meshtools.cellDataToNodeData( # self.mag_mesh, # self.magnetization[pulse, :, 2].real) # vtk_mag = np.array((xr, yr, zr)) # theta from -pi to pi for nice display with cyclic colorbar self.mag_mesh.addData('theta', self.theta[pulse] % (np.pi*2) - np.pi) self.mag_mesh.exportVTK(vtk_name.as_posix(), self.theta) savefieldvtk(vtk_name.as_posix(), self.mag_mesh, self.magnetization[pulse])
[docs] def sliceKernel1D(self, num_cpu=None, loop_mesh=None, new_bfield=False, interpolate_bfield=True, slice_name=None): import multiprocessing as mpc import tempfile as temp self._checkSrc() if self.kernelMesh1D is None: self.create1DKernelMesh(calc_3D_stats=False) tick_sI = time.time() if num_cpu is None: num_cpu = self.num_cpu N = len(self._zvec) - 1 number_of_processes = min(min(mpc.cpu_count(), N), num_cpu) tx_name = None rx_name = None if slice_name is None: tx_index = None rx_index = None if interpolate_bfield: # b field case 1/2: b field is interpolated from loop mesh self.BFieldCalculation(just_loop_fields=True, loop_mesh=loop_mesh, recalc_loop_fields=new_bfield, num_cpu=number_of_processes) tx_field = self.tx.field if self.coincident: rx_field = None mesh = self.tx.loopmesh else: rx_field = self.rx.field mesh = self.seploopmesh tx_name = None rx_name = None if os.path.exists(self.tx.loop_mesh_name): temporary_mesh = False meshName = self.tx.loop_mesh_name log.debug('loop mesh for multiprocessing: {}'.format(meshName)) else: # create temporary filename temporary_mesh = True tempFile = temp.NamedTemporaryFile(suffix='.bms', delete=False) meshName = tempFile.name tempFile.close() # pygimli needs write access log.debug('save mesh for multiprocessing: {}'.format(meshName)) log.debug('loopmeshname: {}'.format(self.tx.loop_mesh_name)) log.debug('exists: {}'.format( os.path.exists(self.tx.loop_mesh_name))) mesh.save(meshName) self._checkInterpolation(slices=True) else: # b field case 2/2: b field is calculated directly on each slice temporary_mesh = False meshName = None tx_field = None rx_field = None tx_name = self.tx._dump2Temp() if self.coincident: rx_name = None else: rx_name = self.rx._dump2Temp() else: # case 3 import fields from custEM slices temporary_mesh = False meshName = None tx_index = self.tx_index rx_index = self.rx_index tx_field = None rx_field = None self.kernelMeshCellVolume = None self.kernelMeshCellCenter = None slice_queue = mpc.JoinableQueue() # output for worker, input for summation sum_slice_queue = mpc.Queue() # output for summation stop_queue = mpc.JoinableQueue() # end signal for summation parent_id = os.getpid() log.debug('parent_id: %d', parent_id) log.info( 'Get %d slices... start calculating in %d parallel processes.', N, number_of_processes) processes = [] kernel_mesh_1d = pg.Mesh(self.kernelMesh1D) out_pos_2d = np.array(kernel_mesh_1d.cellCenters()) indices = np.array_split(range(N), number_of_processes) log.debug('indices: {}'.format(indices)) for i in range(number_of_processes): log.debug('index: {}'.format(i)) worker = mpc.Process(target=_1DSliceKernelWorker, args=(meshName, out_pos_2d, tx_field, rx_field, self.coincident, self._zvec, indices[i], slice_queue, parent_id, i, # worker ID self.survey.earth, self.fid, tx_name, # tx loop name rx_name, # rx loop name slice_name, tx_index, rx_index, )) log.debug('created worker') worker.daemon = True processes.append(worker) worker.start() log.debug('started worker: {}'.format(worker.is_alive())) for p in processes: log.debug('still alive: {}'.format(p.is_alive())) sum_worker = mpc.Process(target=_1DSliceSummationWorker, args=(slice_queue, sum_slice_queue, parent_id, stop_queue, (len(self.pulses), len(self._zvec) - 1), np.array(self.kernelMesh1D.cellSizes()), )) sum_worker.daemon = True sum_worker.start() for p in processes: log.debug('still alive: {}'.format(p.is_alive())) log.debug('started additional worker for summation: {}' .format(sum_worker.is_alive())) # sum_worker.join() isInterrupt = False log.debug('sum worker still alive: {}'.format(sum_worker.is_alive())) while True: try: if stop_queue.empty(): if not sum_worker.is_alive(): isInterrupt = True if isInterrupt: log.error( 'Sum Worker died, shutting ' 'down the rest as well and abort run.') sum_worker.terminate() break time.sleep(0.5) else: # waits till all slices are summarized, queue empty stop_queue.get() stop_queue.task_done() log.debug('join slice queue') slice_queue.join() log.debug('finished, end of multiprocessing, cleaning up') break except KeyboardInterrupt: isInterrupt = True log.critical( 'detect Keyboard Interrupt. Shutting processes down.') sum_worker.terminate() break for p in processes: p.terminate() # tells the process to terminate time.sleep(0.1) if p.is_alive(): log.debug('proc not terminated, join()') p.join() # waits till termination of p if temporary_mesh: os.unlink(meshName) # release temporary mesh file if isInterrupt: raise SystemExit self.K = sum_slice_queue.get() # final calculated kernel, normalized log.debug('final Kern: {}'.format(self.K.shape)) sum_worker.terminate() # tells sumWorker to terminate if sum_worker.is_alive(): log.debug('sum_worker not terminated, join()') sum_worker.join() # waits till termination of p log.info('overall calculation time: %.4f seconds' % (time.time() - tick_sI)) if temporary_mesh: log.debug('temporary file deleted?: {} (supposed to be True)' .format(not os.path.exists(meshName))) return self.K
[docs] def show2DMesh(self): if self.dimension == 2 and self.kernelMesh2D is not None: try: # pg.__version__ < 1.1 from pygimli.mplviewer import (showMesh, drawMesh) except ImportError: # pg.__version__ >= 1.1 from pygimli.viewer.mpl import (showMesh, drawMesh) ax, _ = showMesh(self.kernelMesh2D, self.kernelMesh2D.cellMarkers(), cMap='summer', label='region marker', hold=True) drawMesh(ax, self.kernelMesh2D) plt.show() else: raise Exception('No 2D Mesh available') return ax
[docs] def showLoopLayout(self, ax=None, **kwargs): if self.tx is None or self.rx is None: raise Exception( 'Cannot show loop layout if loops are not defined') loops = [self.tx] if not self.coincident: loops.append(self.rx) kwargs.setdefault('label', [f'tx: {self.tx!r}', f'rx: {self.rx!r}']) else: kwargs.setdefault('label', [f'tx/rx: {self.tx!r}']) showLoopLayout(*loops, ax=ax, **kwargs) return ax
[docs] def show(self, toplot=['real', 'imag', 'amp', 'phase', '0D'], indices=None, savename=None, normed=False, suptitle=None, ax=None, pulse_in_log=False, kernel_absolute_values=False, cbar_percentage=0.99, fixed_cbar=False, lut=33, show_marked_edges=False, **kwargs): # sended to plotting subroutines """ Visualise the Kernel with respect to the desired dimension. Cases ----- Automatically defined within the kernel class via the parameter kernel.dimension = [0...3]. Plotting of a kernel in the desired dimension is only possible if the kernel is also calculated with respect to that dimension. It's not possible to calculate the kernel with kernel.dimension = 1 and then plot the kernel with kernel.dimension = 2. 0D : Simple Graph plotting kernel-values over pulsemoments 1D : Graph with 1D integrated kernels over the depth of the model 2D : Slice of the x-z-plane with triangle mesh containing the 2D 3D : Export of the kernel in vtk format for visualising. Parameter 0D ------------ none so far Parameter 1D ------------ Plots the 1D integrated Kernel with a given z discretisation over the measured pulse sequences. toplot: list [ ['real', 'imag', 'amp', 'phase', '1D'] ] There are different possibilities to plot the kernel. This parameter defines which part of the kernel is shown. Possible options are: 'real', 'imag', 'amp', 'phase', '0D' (integrated over z). All strings in the toplot variable will be plotted in the same order given in the list. cMap: string ['viridis'] Defines the colormap used to display the kernel. In order to get a good contrast between the max and min as well as being useful in comparison with MRSMatlab, 'viridis' is the default colormap. Any colormap reachable by the plt.get_cmap(...) method can be chosen. normed: bool [True] A on the dimension based normalisation of the plot permits a better assessment of the kernel distribution. ax: plotting ax or list of axes [None] Plot on a predefined ax and gives back the ax. A onedimensionla list of axes is also accepted, if the number of items in 'toplot' is the same as the available axes. lut: None or int [None] Number of colors for the colorbar. If lut is not None it must be an integer giving the number of entries desired in the lookup table, and name must be a standard mpl colormap name. Parameter 2D ------------ indices: list By default one 2D plot is created for each pulsemoment. In order to limit the number of plots the optional paramter indices can be given as a list of indices referring to the pulse moments to be shown. cMap: string ['viridis'] See Parameter 1D. normed: bool [True] A on the dimension based normalisation of the plot permits a better assessment of the kernel distribution. show_marked_edges: boolean [ False ] Whether or not marked edges gets drawn. possible kwargs for matplotlib: cMin, cMax for range of the colorbar. All other kwargs are reaching matplotlib functions. default label 2D: 'integrated kernel (2D) [nV/$m^2$] pulsemoment: {:.3f} As' .format(self.pulses[i]) Parameter 3D ------------ A self-sufficient plot of the kernel without any integration would result in a set of 3D Cubes and is not implemented for now. Instead the kernel will be saved in vtk format which can be easily handled. savename: string A String defining the relative path to the vtk-file the kernel will be saved in. If not given the default savename will be flagged with the string '_default_' and contain some information about the kernel. EXAMPLE ------- 2D: >>> ax, cbar = kernel.show(indices=[16], cMin=-1, >>> cMax=2, size=20, pad=0.7) >>> ax.set_ylim(-50, 0) """ import matplotlib as mpl from mpl_toolkits.axes_grid1 import make_axes_locatable mpl.rcParams['axes.unicode_minus'] = False if self.dimension == 0: print('Kernel.show(0D): simple Graph') _, ax = plt.subplots(1, 1) ax.plot(self.pulses, 1e9 * np.absolute(self.K), 'ro--') ax.set_xlabel('pulsemoments [As]') ax.set_ylabel('magnitude of the integrated kernel [nV/m^3]') elif self.dimension == 1: # plotaxes xachse_mitte = (self.pulses[1:] + self.pulses[:-1])/2 xachse_erweitert = [2 * self.pulses[0] - xachse_mitte[0]] xachse_erweitert.extend(xachse_mitte) xachse_erweitert.append(2 * self.pulses[-1] - xachse_mitte[-1]) xachse = np.array(xachse_erweitert) xx, zz = np.meshgrid(xachse, self._zvec) # def def Plot1D(xx, zz, kernel, toplot=['real', 'imag', 'amp', 'phase', '0D'], cMap='viridis', ax=None, pulse_in_log=True, lut=None, **kwargs): log.debug('Plot1D kwargs: {}'.format(kwargs)) if ax is None: _, ax = plt.subplots(1, len(toplot), sharey=True, sharex=True) if len(toplot) == 1: ax = [ax] # or ax[0] will fail num_normal = 0 # original input size = kwargs.pop('size', 12) clim_in = kwargs.pop('clim', None) real_imag_0d = kwargs.pop('real_imag', False) # initial values clim = clim_in cbar = None for i, item in enumerate(toplot): # optionally normalizing the plot to values per meter # default = False Z_weight = np.ones(len(self._zvec[1:])) if normed is True and item.lower() != 'phase': Z_weight = np.abs(self._zvec[1:] - self._zvec[:-1]) Z_weight = Z_weight[:, np.newaxis] if item.lower() == '0d': if len(toplot) > 1: sep_ax = plt.subplot(1, len(toplot), i + 1) else: sep_ax = ax[0] to_plot_0d = ['abs'] if real_imag_0d: to_plot_0d = ['abs', 'real', 'imag'] misc.drawSoundingCurve( sep_ax, self.getKernel(), self.pulses, size=size, to_plot=to_plot_0d, color=kwargs.pop('dot_color', 'r')) continue elif item.lower() == 'real': item_plot = np.copy(kernel.real.T) / Z_weight num_normal += 1 elif item.lower() == 'imag': item_plot = np.copy(kernel.imag.T) / Z_weight num_normal += 1 elif item.lower() in ('amp', 'abs'): item_plot = np.absolute(kernel).T / Z_weight num_normal += 1 elif item.lower() == 'phase': item_plot = np.arctan2(kernel.imag, kernel.real).T num_normal += 1 else: log.error( 'toplot variable "{}" cannot be identified. Abort.' .format(item)) return if clim_in is None: cmap, clim = getCMapAndLim(item_plot[1:, :], phase=item == 'phase', lut=lut) # clim = [quantile(item_plot[1:, :], # perc=1. - cbar_percentage, # add_rel=-0.1), # quantile(item_plot[1:, :], # perc=cbar_percentage)] else: cmap = cMap im0 = ax[i].pcolormesh( xx, zz, item_plot, cmap=cmap) ax[i].set_title(item) divider = make_axes_locatable(ax[i]) cax = divider.append_axes("right", size="18%", pad=.06) cbar = plt.colorbar(im0, cax=cax) if item == 'phase': im0.set_clim(-np.pi, np.pi) cbar.set_ticks([-np.pi, 0, np.pi]) cbar.ax.set_yticklabels(['$-\pi$', ' $0$', ' $\pi$'], size=size + 1) else: im0.set_clim(clim[0], clim[1]) cbar.ax.tick_params(labelsize=size) ax[i].set_xlim(np.min(xx), np.max(xx)) ax[i].set_ylim(np.min(zz), np.max(zz)) if pulse_in_log: ax[i].set_xscale('log') ax[i].set_xlabel('pulse moments [As]') if num_normal == 1: # the counter ensures that this is only done once per # plot and only if needed ax[i].set_ylabel('depth [m]') setAxSize(ax[i], size) return ax, cbar # plots log.info('Kernel.show(1D): pcolormesh') size = kwargs.get('size', 12) dot_color = kwargs.get('dot_color', 'r') cMap = kwargs.get('cMap', 'viridis') ax, cbar = Plot1D( xx, zz, self.getKernel(reduced=False), toplot=toplot, cMap=cMap, ax=ax, pulse_in_log=pulse_in_log, size=size, real_imag=kwargs.pop('real_imag', False), clim=kwargs.get('clim', None), lut=lut, dot_color=dot_color) if suptitle is not None: plt.suptitle(suptitle) elif self.dimension == 2: mpl.rcParams['mathtext.default'] = 'regular' # orientation = kwargs.get('orientation', 'vertical') if indices is None: indices = np.arange(0, len(self.pulses), 5) indices = np.atleast_1d(indices) if ax is None: ax = [None] * len(indices) if not isinstance(ax, list): ax = [ax] cbar = [None] * len(indices) xlim = kwargs.pop('xlim', None) ylim = kwargs.pop('ylim', None) size = kwargs.get('size', 12) log.debug(kwargs) log.info('Kernel.show(2D): triangle mesh: pg.show(), ' 'for {} pulse(s).'.format(len(indices))) toplot = np.atleast_1d(toplot) if len(toplot) != 1: log.info(f'Changes toplot from {toplot} to "abs". ' 'Please use only one entry per 2d export.') toplot = ['abs'] if toplot[0] == 'imag': toshow = 1e9 * np.imag(self.K) /\ np.array(self.kernelMesh2D.cellSizes()) # MMP elif toplot[0] == 'real': toshow = 1e9 * np.real(self.K) /\ np.array(self.kernelMesh2D.cellSizes()) # MMP elif toplot[0] in ('abs', 'amp'): toshow = 1e9 * np.absolute(self.K) /\ np.array(self.kernelMesh2D.cellSizes()) # MMP elif toplot[0] in ('phase', 'angle'): toshow = np.angle(self.K) elif toplot[0] in ('0d', '0D'): dot_color = kwargs.get('dot_color', 'r') if kwargs.pop('real_imag', True): to_plot_0d = ['abs', 'real', 'imag'] else: to_plot_0d = ['abs'] misc.drawSoundingCurve( ax[0], self.getKernel(), self.pulses, size=size, color=dot_color, to_plot=to_plot_0d) # need to exit at this point return ax, None else: raise Exception('{} not valid'.format(toplot)) for n, i in enumerate(indices): log.log(16, 'plot ({}) for pulse moment {:.2f} As' .format(toplot[0], self.pulses[i])) dd = toshow[i] assert np.all(np.isfinite(dd)), 'check cell sizes for zeros' if toplot[0] not in ('phase', 'angle'): if not fixed_cbar: absmax = quantile(np.abs(dd), perc=cbar_percentage, add_rel=0) kwargs.setdefault( 'cMin', -absmax if not np.alltrue(dd >= 0) else 0) kwargs.setdefault('cMax', absmax) else: if n == 0: mm = quantile(np.abs(toshow), perc=cbar_percentage) mm_neg = -mm if np.any(toshow < 0) else 0 kwargs.setdefault('cMax', mm) kwargs.setdefault('cMin', mm_neg) if np.isclose(kwargs.get('cMin'), -kwargs.get('cMax')): cmap = kwargs.pop('cMap', plt.get_cmap('bwr')) else: cmap = kwargs.pop('cMap', plt.get_cmap('viridis')) else: kwargs.setdefault('cMin', -np.pi) kwargs.setdefault('cMax', np.pi) cmap = kwargs.pop('cMap', 'twilight_shifted') cMin = kwargs.get('cMin') cMax = kwargs.get('cMax') log.debug('min/max data to plot: {} / {}' .format(np.min(dd), np.max(dd))) log.debug('min/max cbar range: {} / {}' .format(cMin, cMax)) if not show_marked_edges: # local copy to mask boundary markers mesh = pg.Mesh(self.kernelMesh2D) mesh.setBoundaryMarkers(np.zeros(mesh.boundaryCount(), dtype=int)) else: # just reference mesh = self.kernelMesh2D ax[n], cbar[n] = \ pg.show(mesh, data=dd, colorBar=kwargs.pop('colorBar', True), cMap=cmap, ax=ax[n], logScale=kwargs.pop('logScale', False), cMin=cMin, cMax=cMax) if cbar[n] is not None: label = kwargs.pop( 'label', 'integrated kernel (2D) [nV/m²] pulsemoment: {:.3f} As' .format(self.pulses[i])) cbar[n].set_label(label=label, size=size) cbar[n].ax.tick_params(labelsize=size) if suptitle is not None: plt.suptitle(suptitle) ax[n].set_xlabel('distance [m]') ax[n].set_ylabel('depth [m]') if xlim is not None: ax[n].set_xlim(xlim[0], xlim[1]) if ylim is not None: ax[n].set_ylim(ylim[0], ylim[1]) for item in ([ax[n].title, ax[n].xaxis.label, ax[n].yaxis.label] + ax[n].get_xticklabels() + ax[n].get_yticklabels()): item.set_fontsize(size) elif self.dimension == 3: print('Kernel.show(3D): export vtk instead...') self.exportVTK(savename) ax = None cbar = None elif self.dimension == 4: print('nice try') return ax, cbar
def _debug_showKernelParts(self, ax=None, ind=0, real=True, showabs=True, xlim=None, ylim=None, clim1=None, clim2=None, clim3=None, clim4=None): """ Plots the three 2D Kernel parts according to Hertrich (2005, fig 9). For debugging purpose only, works only in debug mode of kernel class. """ if self.debug is False: raise Exception('"showKernelParts" only works in debug mode ' 'in order to save memory in production mode.') if ax is None: fig, ax = plt.subplots(2, 2, figsize=(14, 14)) else: fig = np.squeeze(ax)[0].figure print('K_part1') print(type(self.K_part1[ind][0])) axes = ax.flatten() if showabs: toplot1 = np.abs(self.K_part1[ind]) cMap = plt.get_cmap('jet', 60) # for comparison against hertrich et al. 2005 else: if real: toplot1 = np.real(self.K_part1[ind]) else: toplot1 = np.imag(self.K_part1[ind]) cMap = plt.get_cmap('bwr', 31) if clim1 is None: absmax1 = quantile(toplot1, perc=1, add_rel=0) absmin1 = quantile(toplot1, perc=0.005, add_rel=0) if showabs: clim1 = [absmin1, absmax1] else: clim1 = [-absmax1, absmax1] # Integrated Transmitter Part of the kernel function _, cbar1 = pg.show(self.kernelMesh2D, data=toplot1, colorBar=True, cMap=cMap, logScale=showabs, ax=axes[0], cMin=clim1[0], cMax=clim1[1]) axes[0].set_title('part 1') print('K_part2') if showabs: toplot2 = np.abs(np.atleast_2d(self.K_part2)[0]) # (1, n) cMap = plt.get_cmap('jet', 60) # for comparison against hertrich et al. 2005 else: if real: toplot2 = np.atleast_2d(self.K_part2)[0].real # (1, n) else: toplot2 = np.atleast_2d(self.K_part2)[0].imag # (1, n) toplot2 = 1e9 * toplot2 if clim2 is None: absmax2 = quantile(toplot2, perc=1, add_rel=0) absmin2 = quantile(toplot2, perc=0.005, add_rel=0) if showabs: clim2 = [absmin2, absmax2] else: clim2 = [-absmax2, absmax2] # Integrated Receiver Part of the kernel function _, cbar2 = pg.show(self.kernelMesh2D, data=np.squeeze(toplot2), # (n,) colorBar=True, cMap=cMap, logScale=showabs, cMin=clim2[0], cMax=clim2[1], ax=axes[1]) axes[1].set_title('part 2') if self.coincident: axes[2].text(0.5, 0.5, 'coincident, geometry factor = 1', ha='center', va='center', transform=ax[2].transAxes) else: print('K_part3') if showabs: toplot3 = np.abs(np.atleast_2d(self.K_part3)[0]) # (1, n) cMap = plt.get_cmap('jet', 60) # for comparison against hertrich et al. 2005 else: if real: toplot3 = np.atleast_2d(self.K_part3)[0].real # (1, n) else: toplot3 = np.atleast_2d(self.K_part3)[0].imag # (1, n) if clim3 is None: absmax3 = quantile(toplot3, perc=1, add_rel=0) absmin3 = quantile(toplot3, perc=0.005, add_rel=0) if showabs: clim3 = [absmin3, absmax3] else: clim3 = [-absmax3, absmax3] # Integrated Geometric Part of the Kernel _, cbar3 = pg.show(self.kernelMesh2D, data=np.squeeze(toplot3), # (n,) colorBar=True, cMap=cMap, logScale=showabs, ax=ax[2], cMin=clim3[0], cMax=clim3[1]) axes[2].set_title('part 3') if clim4 is None: kw = {} else: kw = {'cMin': clim4[0], 'cMax': clim4[1]} _, cbar4 = self.show( toplot=['abs'if showabs else 'real' if real else 'imag'], ax=axes[3], indices=[ind], cbar_percentage=1, logScale=showabs, cMap=cMap, **kw) axes[3].set_title('kern') if xlim: for axi in axes: axi.set_xlim(xlim) if ylim: for axi in axes: axi.set_ylim(ylim) return ax, (cbar1, cbar2, cbar3, cbar4[0])
[docs] def export2DKernel(self, fig=None, ax=None, savename=None, png_dpi=300, noYLabel=False, index=0, colorBar=True, size=13, pdf=None, fixed_cbar=False, **kwargs): """ Exports 2D Kernel for given pulse moment. Kwargs are redirected to *show*. """ with cm.plt_ioff(): suptitle = kwargs.pop('suptitle', '') if fig is None or ax is None: if kwargs.get('toplot') in ('0d', '0D'): figsize = (6, 9) else: figsize = None fig, ax = plt.subplots(1, 1, figsize=figsize) self.show(indices=[index], size=size, ax=ax, colorBar=colorBar, fixed_cbar=fixed_cbar, **kwargs) ax.set_ylabel('Z [m]') ax.set_xlabel('X [m]') if noYLabel is True: ax.set_ylabel('') ax.yaxis.set_major_locator(plt.NullLocator()) plt.suptitle(suptitle) if pdf is not None: pdf.savefig(fig, bbox_inches='tight') plt.close(fig) else: if savename is not None: fig.savefig( savename, bbox_inches='tight', dpi=png_dpi if savename.endswith('.png') else None) plt.close(fig)
[docs] def export2DKernel2PDF(self, name, fixed_cbar=False, **kwargs): """ Export 2D Kernel for all pulse moments as stiched pdf. Kwargs are redirected to *export2DKernel*. """ from matplotlib.backends.backend_pdf import PdfPages log.info('export 2D Kernel as pdf.') with PdfPages(name) as pdf: if kwargs.get('toplot') in ('0d', '0D'): rp = [0] else: rp = range(len(self.pulses)) for ind in rp: log.info('pulse: {}/{}'.format(ind + 1, len(self.pulses))) self.export2DKernel( pdf=pdf, index=ind, savename=None, fixed_cbar=fixed_cbar, **kwargs) d = pdf.infodict() d['Title'] = name d['Subject'] = self.__repr__()
[docs] def exportVTK(self, savename, save=['abs'], save_in_log=False): if self.dimension not in [2, 3]: print('Export as VTK only for 2D or 3D implemented.') return False tick = time.time() checkDirectory(savename, filename=True) mesh = pg.Mesh(self.kernelMesh2D) pm_vec = self.pulses for i in range(np.shape(self.K)[0]): if isinstance(self.K[0, 0], complex): if 'real' in save: kernel_r = 1e9 * self.K[i].real / np.array( self.kernelMesh2D.cellSizes()) if save_in_log: kernel_r = np.log10(np.abs(kernel_r)) mesh.addData('kernel_%05.2f_real' % (pm_vec[i]), pg.Vector(kernel_r)) if 'imag' in save: kernel_i = 1e9 * self.K[i].imag / np.array( self.kernelMesh2D.cellSizes()) if save_in_log: kernel_i = np.log10(np.abs(kernel_i)) mesh.addData('kernel_%05.2f_imag' % (pm_vec[i]), pg.Vector(kernel_i)) if 'abs' in save or 'amp' in save: kernel_a = 1e9 * np.absolute(self.K[i]) / np.array( self.kernelMesh2D.cellSizes()) if save_in_log: kernel_a = np.log10(kernel_a) mesh.addData('kernel_%05.2f_abs' % (pm_vec[i]), pg.Vector(kernel_a)) # ABS = np.absolute(kernel[i]) * np.sign(kernel[i].real) if 'phase' in save: mesh.addData('kernel_%05.2f_phase' % (pm_vec[i]), pg.Vector(np.angle(self.K[i]))) else: mesh.addData('kernel_%05.2f' % (pm_vec[i]), (pg.Vector(self.K[i]))) print('save kernel to "{}" ...'.format(savename), end='') mesh.exportVTK(savename) print(' %.3f sec' % (time.time() - tick)) return True
[docs] def export1D(self, savename, loop_layout=True, title='{0.survey.earth!r}'): with cm.plt_ioff(): fig, ax = plt.subplots(1, 5, figsize=(16, 9)) ax, _ = self.show(ax=ax, size=8, cbar_percentage=0.99) fig.suptitle(title.format(self)) ax[0].figure.savefig(savename + '.png') log.info('saved "{}"'.format(savename + '.png')) plt.close(fig) if loop_layout: fig, ax = plt.subplots(1, 1, figsize=(16, 9)) ax = self.showLoopLayout(ax=ax) ax.figure.savefig(savename + '_layout.png') log.info('saved "{}"'.format(savename + '_layout.png')) plt.close(fig)
[docs]def checkForKernel(name, mkdir=False): if not os.path.exists(name): if mkdir is True: path = os.path.dirname(name) if path != '': if not os.path.exists(path): os.mkdir(path) return False else: return True
[docs]def simpleZVec(numz, minz, reduced=False): kern = Kernel() kern.createZVector(numz, minz) return kern.getZVector(reduced=reduced)
[docs]def integrateKernelH2(mat, array): array = np.atleast_2d(array) new_arr = np.zeros((array.shape[0], mat.nRows()), dtype=complex) for ind, subarr in enumerate(array): new_arr[ind] = (mat * subarr.real).array() + \ 1j * (mat * subarr.imag).array() return np.squeeze(new_arr)
def _2DSliceSummationWorker(queue_in, queue_sum, stop_queue, shape, y_count, worker_count, parent_id): """ Worker in the kernel-multicore-calculation approach. """ # first task kern = np.zeros(shape, dtype=complex) log.debug('started sum worker') done = 0 if sys.platform == 'linux' and log.getEffectiveLevel() <= 20: iterable = progressBar(range(worker_count), 'gather kernel parts: ') else: iterable = range(worker_count) log.debug('Summation worker: start waiting for processes to finish.') while queue_in.qsize() < worker_count: time.sleep(0.2) for done in iterable: while True: if os.getppid() != parent_id: sys.exit( 'My main process died (%d != %d)! So... I\'m out too.' % (os.getppid(), parent_id)) try: newSlice = queue_in.get(timeout=0.1) # aquire queue kern += newSlice queue_in.task_done() # release queue # done += 1 if sys.platform != 'linux': log.log(13, f'shape: {newSlice.shape}, total: {done}' .format(done + 1)) del newSlice # sometimes auto memory clean up isnt working break except (queue.Empty, IOError): # IOError = broken pipe # don't worry the other workers are not finished yet: try again # print('sleeping') time.sleep(0.1) # shutting process down queue_sum.put(kern, block=False) time.sleep(0.2) stop_queue.put(True) return def _1DSliceSummationWorker(queue_in, queue_sum, parent_id, stop_queue, shape, kern_cell_sizes): """ Worker in the kernel-multicore-calculation approach. """ # first task kern = np.zeros(shape, dtype=complex) log.log(13, 'started sum worker') done = 0 while done < shape[1]: if os.getppid() != parent_id: sys.exit('My main process died (%d != %d)! So... I\'m out too.' % (os.getppid(), parent_id)) try: newSlice, idx = queue_in.get(timeout=0.3) # aquire queue # integration over 2D Mesh in x-y plane newSlice = np.sum(newSlice * kern_cell_sizes, 1) # already weighted with y vec, cell sizes comes later from outside kern[:, idx] = newSlice queue_in.task_done() # release queue done += 1 log.debug(f'shape: {newSlice.shape}, index: {idx}, total: {done}') except (queue.Empty, IOError): # IOError = broken pipe # not to worry the other workers are not finished yet: try again time.sleep(1) # shutting process down queue_sum.put(kern, block=False) time.sleep(0.2) log.log(13, 'sum worker finished') stop_queue.put(True) return def _2DSliceKernelWorker(srcMeshName, outPos2D, indices, yvec, txSrcField, rxSrcField, outQueue, parent_id, workerID, earth, fid, slice_name): """ Worker in the kernel-multicore-calculation approach""" time_loadmesh = time.time() if slice_name is None: log.log(13, 'worker {}: import sourcemesh "{}"...' .format(workerID, srcMeshName)) srcMesh = pg.load(srcMeshName) log.debug(srcMesh) # outPos2D = 3d contruct with y-values = 0 rxalpha = None rxbeta = None rxzeta = None rxperpend = None ythks = np.abs(np.diff(yvec)) ymids = (yvec[:-1] + yvec[1:]) / 2 coincident = fid.tx_index == fid.rx_index if workerID == 0: todo = progressBar(indices, 'processed slices per worker: ') else: todo = indices for yi in todo: if os.getppid() != parent_id: sys.exit('My main process died ({} != {}) So... I\'m out too.' .format(os.getppid(), parent_id)) # y midpoint, index of the pos of the y-value in the 3D mesh log.debug('worker no{} y = {:.2f}'.format(workerID, ymids[yi])) sys.stdout.flush() if slice_name is None: # outPos2D = 3d contruct with y-values = 0 slice3D = np.copy(outPos2D) slice3D[:, 1] = np.ones(np.shape(slice3D)[0]) * ymids[yi] # do interpolation from 3D FEM Mesh to 2D slice as matrix iMat = srcMesh.interpolationMatrix(slice3D) # use matrix to calc tx field txfieldSlice = cm.interpolateField_Matrix(txSrcField, iMat) if not coincident: # optionally use matrix to calc rx field rxfieldSlice = cm.interpolateField_Matrix(rxSrcField, iMat) # next slice will be different, can delete rest to ensure clean up # (this can otherwise be tricky in multiprocessing) del slice3D del iMat else: # load custEM results log.debug('Worker {} load field slice {}'.format( workerID, slice_name.format(fid.tx_index, yi))) field_slice_tx = np.load(slice_name.format(fid.tx_index, yi)) # shape: (mesh, 6) # 6: complex-field (x, y, z), coords (x, y, z) # we only need the field -> [:, :3] txfieldSlice = field_slice_tx[::-1, :3] txfieldSlice = txfieldSlice.T * 4e-7 * np.pi # (n, 3) -> (3, n) and H to B del field_slice_tx if not coincident: field_slice_rx = np.load(slice_name.format(fid.rx_index, yi)) rxfieldSlice = field_slice_rx[::-1, :3] rxfieldSlice = rxfieldSlice.T * 4e-7 * np.pi # (n, 3) -> (3, n) and H to B del field_slice_rx # elliptical decomposition txalpha, txbeta, txzeta, txperpend = \ Kernel.ellipticalDecomposition_multi(txfieldSlice, earth) if not coincident: rxalpha, rxbeta, rxzeta, rxperpend = \ Kernel.ellipticalDecomposition_multi(rxfieldSlice, earth) # kernel integration (K1, K2, K3) = Kernel.kernelCalculation_multi( fid, earth, txalpha, txbeta, txzeta, txperpend, rxalpha=rxalpha, rxbeta=rxbeta, rxzeta=rxzeta, rxperpend=rxperpend, calc_theta=False,) # log.warning('Export of kernel slice.') # path = Path('kernel_slices') # path.mkdir(exist_ok=True) # np.save(path.joinpath('kernel_slice_2D_{}.npy' # .format(yi)).as_posix(), # K1 * K2 * K3) # Old before Nov 2020: output single slices # outQueue.put((K1 * K2 * K3 * ythks[yi])) # weighting with y vec thk # Change Nov 2020: summation here: queue only gets summed part of the # kernel as summation worker is actually the slowest part of the # kernel calculation routine. kern_part = K1 * K2 * K3 * ythks[yi] # weighting with y vec thk # added Nov 2020: 180 degree phase flip: np.conjugate kern_part = np.conjugate(kern_part) # weighting over cells sizes in calling function if yi == indices[0]: summed_kern_parts = kern_part else: summed_kern_parts += kern_part log.debug('worker {}: slice {}/{} finished.'.format( workerID, yi, indices)) # general clean up del txalpha del txbeta del txzeta del txperpend del txfieldSlice if rxSrcField is not None: del rxfieldSlice del rxalpha del rxbeta del rxzeta del rxperpend del K1 del K2 del K3 outQueue.put((summed_kern_parts)) log.debug('exit of Worker {} after {:.2f} seconds' .format(workerID, time.time() - time_loadmesh)) sys.stdout.flush() return def _1DSliceKernelWorker(srcMeshName, outPos2D, txSrcField, rxSrcField, coincident, zvec, indices, outQueue, parent_id, workerID, earth, fid, tx_name, rx_name, slice_name, tx_index, rx_index): """ Worker in the kernel-multicore-calculation approach""" # import logging # log = logging.getLogger('comet') tickstart = time.time() if txSrcField is not None: log.debug('worker {}: import sourcemesh "{}"...' .format(workerID, srcMeshName)) srcMesh = pg.load(srcMeshName) log.debug(srcMesh) # outPos2D = 3d contruct with z-values = 0 rxalpha = None rxbeta = None rxzeta = None rxperpend = None # lengths of the cells = weighting (in y direction) z_thk_all = np.abs(np.diff(zvec)) z_mid_all = (zvec[:-1] + zvec[1:]) / 2 if slice_name is None: if txSrcField is None: # setup loop classes if tx_name is None: raise Exception('Need tx field or tx.') tx_class = Loop(tx_name[0], config=tx_name[1]) if not coincident: if rx_name is None: raise Exception('Need rx.') rx_class = Loop(rx_name[0], config=rx_name[1]) for idx in indices: z_thk = z_thk_all[idx] z_mid = z_mid_all[idx] if os.getppid() != parent_id: sys.exit('My main process died (%d != %d)! So... I\'m out too.' % (os.getppid(), parent_id)) log.debug('worker no{} starts interpolating slice at z = {:.2f}' .format(workerID, z_mid)) slice3D = np.copy(outPos2D) slice3D[:, 2] = np.ones(np.shape(slice3D)[0]) * z_mid if slice_name is None: if txSrcField is not None: # b field case 1/2: b field is interpolated from loop mesh # calculate interpolation matrix for actual slice iMat = srcMesh.interpolationMatrix(slice3D) # interpolate bfield for tx txfieldSlice = cm.interpolateField_Matrix(txSrcField, iMat) if not coincident: # interpolate bfield for rx (same matrix) rxfieldSlice = cm.interpolateField_Matrix(rxSrcField, iMat) del iMat else: # b field case 2/2: b field is calculated directly on each slice # need pos, phi, and ds, or a proper loop. tx_class.setLoopMesh(slice3D) tx_class.field = None tx_class.calculate(num_cpu=1, matrix=False, interpolate=False) txfieldSlice = tx_class.field if not coincident: rx_class.setLoopMesh(slice3D) rx_class.field = None rx_class.calculate(num_cpu=1, matrix=False, interpolate=False) rxfieldSlice = rx_class.field else: field_slice_tx = np.load(slice_name.format(fid.tx_index, idx)) txfieldSlice = field_slice_tx[::-1, :3] txfieldSlice = txfieldSlice.T * 4e-7 * np.pi del field_slice_tx if not coincident: field_slice_rx = np.load(slice_name.format(fid.rx_index, idx)) rxfieldSlice = field_slice_rx[::-1, :3] rxfieldSlice = rxfieldSlice.T * 4e-7 * np.pi del field_slice_rx del slice3D # elliptical decomposition txalpha, txbeta, txzeta, txperpend = \ Kernel.ellipticalDecomposition_multi(txfieldSlice, earth) if not coincident: rxalpha, rxbeta, rxzeta, rxperpend = \ Kernel.ellipticalDecomposition_multi(rxfieldSlice, earth) # kernel integration (K1, K2, K3) = Kernel.kernelCalculation_multi( fid, earth, txalpha, txbeta, txzeta, txperpend, rxalpha=rxalpha, rxbeta=rxbeta, rxzeta=rxzeta, rxperpend=rxperpend, calc_theta=False,) # log.warning('Export of kernel slice.') # path = Path('kernel_slices') # path.mkdir(exist_ok=True) # np.save(path.joinpath('kernel_slice_1D_{}.npy' # .format(idx)).as_posix(), # K1 * K2 * K3) log.debug('put kernel Slice {}: shape = {}' .format(idx, np.shape(K2))) outQueue.put((K1 * K2 * K3 * z_thk, idx)) # weighting with z vec # weighting over cells sizes in calling function del txalpha del txbeta del txzeta del txperpend del txfieldSlice if not coincident: del rxfieldSlice del rxalpha del rxbeta del rxzeta del rxperpend del K1 del K2 del K3 del z_mid del z_thk log.debug('exit of Worker {} after {:.2f} seconds' .format(workerID, time.time() - tickstart)) return def _interpolationMatrixWorker(worker_id, src_mesh, target_pos, id_offset, pipe, log_level): from comet.pyhed.misc import vec from comet.pyhed import log log.setLevel(log_level) log.debug('import source mesh') src = pg.Mesh(src_mesh) mat = src.interpolationMatrix(pg.R3Vector(target_pos)) pipe.put([vec.getRSparseValues(mat, getInCRS=False), id_offset, src.nodeCount()]) mat.clear() log.debug(f'exit of Worker {worker_id}') return
[docs]def calcInterpolationMatrix_para(source_mesh, target_pos, num_cpu=8): # input handling import multiprocessing as mpc temporary_mesh = False if isinstance(source_mesh, str): pass elif isinstance(source_mesh, pg.Mesh): temporary_mesh = True import tempfile as temp temp_mesh = temp.NamedTemporaryFile(suffix='.bms', delete=False) source_mesh.save(temp_mesh.name) source_mesh = temp_mesh.name temp_mesh.close() out_queue = mpc.Queue() nop = np.min([mpc.cpu_count(), len(target_pos), num_cpu]) row_count = len(target_pos) splitted = np.array_split(target_pos, nop, axis=0) log.info(f'Start calculating matrices in {nop} parallel processes.') processes = [] index_split = 0 for i in range(nop): worker = mpc.Process(target=_interpolationMatrixWorker, args=(i, source_mesh, splitted[i], index_split, out_queue, log.getEffectiveLevel())) index_split += len(splitted[i]) worker.daemon = True processes.append(worker) worker.start() rows = [] cols = [] vals = [] number = 0 while number < nop: try: sub_mat, split, col_count = out_queue.get() except queue.Empty: time.sleep(2.0) continue rows.extend(np.array(sub_mat[0]) + split) cols.extend(sub_mat[1]) vals.extend(sub_mat[2]) number += 1 IMatrix = pg.SparseMapMatrix(pg.IndexArray(np.array(rows)), pg.IndexArray(np.array(cols)), pg.RVector(vals)) IMatrix.setRows(row_count) IMatrix.setCols(col_count) log.debug('Building up matrix ... ') if temporary_mesh: os.unlink(source_mesh) # release temporary mesh file for p in processes: if p.is_alive(): p.terminate() p.join() return IMatrix
[docs]def create1DInterpolationSlices(kern): z_offsets = kern._zvec cc = kern.kernelMesh1D.cellCenters() coords = np.repeat(np.reshape(cc, (1, *cc.array().shape)), len(z_offsets), axis=0) inputcoords = np.concatenate([coords[:, :, 0, np.newaxis], coords[:, :, 1, np.newaxis], coords[:, :, 2, np.newaxis]], axis=2) for zi, zval in enumerate(z_offsets): inputcoords[zi, :, 2] = zval inputcoords return
[docs]def create2DInterpolationSlices(kern): y_offsets = kern.yvec cc = kern.kernelMesh2D.cellCenters() coords = np.repeat(np.reshape(cc, (1, *cc.array().shape)), len(y_offsets), axis=0) inputcoords = np.concatenate([coords[:, :, 0, np.newaxis], coords[:, :, 2, np.newaxis], coords[:, :, 1, np.newaxis]], axis=2) for yi, yval in enumerate(y_offsets): inputcoords[yi, :, 1] = yval return inputcoords
[docs]def calculateKernelFromSlices( survey_name, invmesh, cfgname, max_length=0.05, max_num=400, path_name='kernel', num_cpu=48, h_order=1, json_name=None, force_new_paths=True, kernel_name='kernel/kern_{}', slice_export_name='kernel_slice'): """ Parameters: ----------- survey: string Filepath of the survey containing the FIDs. invmesh: string Filepath for the inversion mesh the kernel is calculated on. cfgname: string Filepath of the secondary config containingin formation for custEM. The same file should have been used for the field calculation. max_length: float [ 0.05 ] Minimum distance between two slices in y direction. max_num: integer [ 400 ] Number of slices for y discretization. path_name: string [ 'kernel' ] Final name for the slices will be {mdir}/paths/{path_name}_{number}_path.xml "mdir" is defined in the secondary config. num_cpu: integer [ 48 ] Number of cores used. h_order: integer [ 1 ] Order of h-refinement when setting the invmesh in the kernelclass. json_name: string [ None ] Optional name for the json file. Alternatively a tempory file is created. force_new_path: boolean [ True ] Deletes old pathfiles (slices) and forces the generation of new ones. kernel_name: string [ 'kernel/kern_{}' ] Filepath of used for export of the kernel functions. Need to contain a "{}" which is filled with the index of the corresponding FID in the survey class. slice_export_name: string [ 'kernel_slice' ] Filepath of the interpolated magnetic fields for the individual kernel slices. Full slice path contains: "{r_dir}/{approach}/{mesh_name}/{mod_name}_interpolated/ tx_{tx_number}_{slice_export_name}_imesh_{slice_number}.npy" Returns: -------- kernel_name (added a "_{}" if not in original string) """ from comet import snmr from comet import pyhed as ph jdict = {} jdict['survey'] = survey_name jdict['invmesh'] = invmesh jdict['cfgname'] = cfgname jdict['max_num'] = max_num jdict['max_length'] = max_length jdict['num_cpu'] = num_cpu jdict['path_name'] = path_name jdict['h_order'] = h_order jdict['force_new_paths'] = force_new_paths jdict['slice_export_name'] = slice_export_name json_name = ph.misc.dump2Json(json_name=json_name, **jdict) success = ph.misc.local_apps_bash('totalFieldInterpolation', json_name, str(num_cpu)) ph.log.info('success: {}'.format(success)) if success != 0: ph.log.error('Mpirun aborted.') raise SystemExit() # import prepared survey from second script survey = snmr.Survey() survey.load(survey_name, load_meshes=False) cfg = ph.SecondaryConfig(cfgname) # only working approach for this is E-total, hardcoded cfg.approach = 'E_t' for fi, fid in enumerate(survey.fids): kern = survey.createKernel(fi, dimension=2) # 1200 slices instead of 300 kern.createYVec(max_length=max_length, max_num=max_num) kern.set2DKernelMesh(invmesh, order=h_order) slice_dir = '{}/{}/{}/{}_interpolated'.format( cfg.r_dir, cfg.approach, cfg.mesh_name, cfg.mod_name) slice_name = 'tx_{}_' + f'{slice_export_name}' + '_imesh_{}.npy' slices = Path(slice_dir).joinpath(slice_name).as_posix() kern.calculate(num_cpu=num_cpu, slice_name=slices) if "{" not in kernel_name and "}" not in kernel_name: kernel_name += '_{}' kern.save(kernel_name.format(fi), kernelmesh_name=Path(invmesh).name) return kernel_name
# The End