"""
Part of comet/pyhed/loop
This script contains the main class for the sources as well as several scripts
for the initialization of loop classes (build... ).
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import time
import numpy as np
import pygimli as pg
from pathlib import Path, PosixPath
from comet.pyhed.loop.loop_para import (loopCalculation, loopInterpolation,
calcFieldMatrix_para)
from comet.pyhed import config as Config
from comet.pyhed import SecondaryConfig, log
from comet.pyhed.IO import getItem
from comet.pyhed.misc import (
tetgen151, R3VtoNumpy, vec, createPolyBoxWithHalfspace,
Timer, cleanUpTetgenFiles, setNearestMarkers,
convertCoordinates, local_apps_bash, temporal_printoptions)
[docs]class Loop:
"""
Class for the computation of arbitrary shaped polygon loops. Some functions
automatically return this loopclass as result. It is recommended to use
these (you may take a look at the example)
Parameters
----------
Input: string or raw loop class
Filename of a prior saved loopfile (recommended). Alternatively the
output of the function **computeLoopPositions** (not recommended).
For the latter case plenty of convenience functions are found in
the the *loop* submodule of *pyhed* starting with "build"....
config: string or pyhed.config
Defines the configuration file for the loop.
Example
-------
>>> # example: import
>>> loopclass = Loop('path/to/loopfile')
>>> # example: create circular loop
>>> loopclass = buildCircle(10, 12) # 10 m radius, 12 dipoles
"""
def __init__(self, Input, config=None, verbose=False):
self.loopmesh = None
self.loop_mesh_name = None
self.field = None # later Bfield of the loop
self.field_on_nodes = None
# field can either be calculated on Nodes
# or cellmidpoints, None for undecided yet
self.dipolemesh = None
self.dipole_mesh_name = None
self.d_field = None
if isinstance(config, str):
self.config = Config(config) # config name
elif config is None:
self.config = Config() # default config
else:
self.config = config # config class
self.verbose = verbose
self.setMeshParameters()
self.name = None
self.cfg2_savename = None
self.secondary_config = None
if isinstance(Input, str): # load loop file
self.load(Input, config, verbose=self.verbose)
else: # direct input
try:
self.type = Input[4]
self.grounded = Input[3]
self.pos = np.array(Input[0])
self.phi = np.array(Input[1])
self.ds = np.array(Input[2])
if self.type not in ('dipole', 'Dummy') and len(self.pos) > 1:
self._handleNonUniqueDipoles()
self._setDistance()
except:
# I know it's a bare exception, however I only raise
# another exception after that, so no harm done
raise Exception('do not call Loop-Class directly. Please use'
' the build-functions provided by the loop '
'sub package of pyhed.')
# field matrices if calcualtion with matrix=True
self.field_matrix = None
self.field_matrix_sin = None
self.field_matrix_cos = None
# index of the loop (for later usage inside the kernel class)
self.idx = 0
# 2d
self.secMOD = None
self._para_mesh_2d = None
self.has_boundary = False
self.orig_cell_count_2d = None
self.orig_cells = None
self.valid_marker = None
self.paramesh_sort = False # is parameter only filled with unique vals
self.anom_idx = None # value indices if sorted=True
self.para_limits_x = None
self.para_limits_y = None
self.para_limits_z = None
self.preserved_edge_centers = None
self.area = 0.0 # area of loop, if closed polygon
self.turns = 1 # turns of loop, if closed polygon
self.secondary = None
# and at last a placeholder: this will turn into something else someday
self.geometry = None
def __str__(self):
""" Readible representive of the class. """
from comet.pyhed.misc import temporal_printoptions
with temporal_printoptions(threshold=5):
string = '### {} ###\n{}\npos:\n{} {}\nphi:\n{} {}\nds:\n{} {}'\
.format(self.__class__.__name__.upper(),
'looptype: ' + str(self.type),
self.pos, np.shape(self.pos),
self.phi, np.shape(self.phi),
self.ds, np.shape(self.ds))
return string
def __repr__(self):
""" Unambiguous representive of the class. used by other class __str__
"""
if self.type.lower() == 'dummy':
return 'dummy loop'
mean = np.mean(self.pos, axis=0)
if self.type.lower() == 'circle':
end = ', radius: {:.1f} m'.format(
np.linalg.norm(self.pos[0] - mean))
coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean)
elif self.type.lower() == 'rectangle':
range_ = np.max(self.pos, axis=0) - np.min(self.pos, axis=0)
end = ', {:.1f} x {:.1f} m'.format(*range_[:2])
coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean)
elif self.type.lower() == 'line':
end = ', length: {:.1f} m'.format(np.sum(self.ds))
coord = 'between ({:.1f}, {:.1f}, {:.1f})'.format(*self.pos[0])
coord += ' and ({:.1f}, {:.1f}, {:.1f})'.format(*self.pos[-1])
else:
end = ''
coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean)
return ('{} {}, {} dipoles {}{}'
.format('open' if self.grounded else 'closed',
self.type, len(self.pos), coord,
end))
@property
def para_mesh_2d(self):
return self.getParaMesh2D()
[docs] def setPrimaryConfig(self, config):
""" Sets the primary config which handles the resistivity distribution
as well as the frequency of the primary field.
For setting the 1D model directly see *setModel*.
Parameters
----------
config: path or comet.pyhed.config.Config instance
Configuration class instance or file path.
"""
if isinstance(config, str):
self.config.load(config)
else:
self.config = config
[docs] def setSecondaryConfig(self, secondary_config):
""" Sets class attribute with secondary config or loads file.
Parameters
----------
secondary_config: string or pyhed.SecondaryConfig
Seondary config class instance or file path.
"""
# checking secondary configuration file
if isinstance(secondary_config, str):
self.loadSecondaryConfig(secondary_config)
else: # assuming valid sec config?
print('setting secondary config...')
self.secondary_config = secondary_config
[docs] def setFrequency(self, frequency):
"""
Sets the frequency, not angular frequency for the field calculation.
"""
self.config.f = float(frequency)
[docs] def setFType(self, ftype):
assert ftype.lower() in ('h', 'e', 'b'),\
'field type has to be "H", "B" or "E".'
self.config.ftype = ftype.upper()
[docs] def setModel(self, rho, d=None, thickness=True, resistivity=True):
"""
Sets the synthetic 1D layered earth model for dipole calculation.
Parameters
----------
rho: float or array_like
Resistivity/conductivity distribution for a layered earth.
d: float or array_like or None [None]
Thickness or layer depth. Empty (None, 0, or []) for halfspace.
thickness: boolean [True]
The parameter d is used as thickness (True, len(rho) - 1) or depth
(False, len(rho)), respectively.
resistivity: boolean [True]
The parameter rho is used as Resistivity (True) or conductivity
(False), respectively.
"""
d = np.atleast_1d(d)
if d.size == 0 or d[0] == 0 or d[0] is None:
d = np.array([])
rho = np.atleast_1d(rho)
if thickness is False:
d = np.diff(np.abs(d))
if resistivity is False:
rho = 1./rho
assert_message = 'found {} thicknesses for {} layer'\
.format(len(d), len(rho))
assert len(d) + 1 == len(rho), assert_message
self.config.rho = rho
self.config.d = d
if self.secondary_config is not None:
self.secondary_config.sigma_ground = \
(1./np.atleast_1d(self.config.rho)).tolist()
@property
def model(self):
model = np.append(self.config.rho, self.config.d)
return model
[docs] def setMeshParameters(self, refinement_para=1.,
max_area_factor=1.,
tetgen_quality=1.2):
"""
Alters the Parameter responsible for the quality and size used during
automatic mesh generation.
Parameters
----------
refinement_para: float [1]
An increase of refinement_para decreases the size of the
smallest cell at the dipoles and therefore incrreases the total
number of refinement cells around the dipole.
Omitts refinement if value is negative.
max_area_factor: positive float [1]
The max_area_para lineary affects the maximum volume of a cell.
An increase of the parameter allows for greater cells and
therefore decreases the total number of
cells outside of the refined section of the mesh. Set to 0.5 for a
fine mesh and anywhere near 2 for a coarse mesh. Highly affects the
total number of nodes/cells in the mesh.
tetgen_quality: float [1.2]
The tetgen_quality parameter is directly piped to the corresponding
tetgen call in the meshgeneration process. Decrease this parameter
(e.g. to 1.12) to increase the homogeneity of the triangles. Be
careful with this one, tetgen very easy starts to split cells in
smaller and smaller pieces and therefore increase the total
cellcount to very high values (millions and more).
"""
self.refinement_para = 400. * refinement_para
self.max_area_para = 0.004 * max_area_factor
self.tetgen_quality = tetgen_quality
[docs] def setParaMesh2D(self, para_mesh_2d, limits=None, append_boundary=False,
preserve_edges=False, anomaly=None, sort=True,
**kwargs):
""" Sets 2D parameter mesh for secondary field calculation.
Parameters
----------
para_mesh_2d: string or pg.Mesh
2D parameter mesh or path to mesh.
limits: [float, float] or None
Minimum and maximum values for y of the area where 2D parameters
are to be transferred to the 3D FEM mesh. Default are the x
extension of the 2D parameter mesh.
append_boundary: boolean [ False ]
Fills in an additional boundary with prolongated resistivity values
around the transferred 2D values. This is useful as it reduces
artifacts at the edge of the 2D domain oin the FEM mesh.
anomaly: None or np.ndarray [ None ]
Optionally. Alternatively use *setAnomalies*. Anomaly vector
(conductivity vector) with values for each cell in the 2D
parameter domain. Attention: conductivity is used, not
resistivity!
sort: boolean [ False ]
Optionally. Alternatively use *setAnomalies*. If True, set the
same marker for double values in anomaly vector.
This is for blocky 2d structures, where only a few different
regions are required. Use default False if dealing with smooth
inversion results, for example in a structural coupling.
kwargs to *appendTriangleBoundary*
Calls *setAnomalies* of anomaly is given.
Furthermore calls *updateFEMAnomaly* if FEMMesh has been set already.
"""
log.info('setParaMesh2D: setting 2D paramter mesh: {}'
.format(para_mesh_2d))
# string or copy: always copy of mesh, never a reference
self._para_mesh_2d = pg.Mesh(para_mesh_2d)
self.orig_cell_count_2d = self._para_mesh_2d.cellCount()
log.debug('Original paramesh cell count (no boundary): {}'
.format(self.orig_cell_count_2d))
if limits is not None:
self.para_limits_y = limits
else:
val_min = np.min(self.pos[:, 1])
val_max = np.max(self.pos[:, 1])
val_thk = val_max - val_min
self.para_limits_y = [val_min - 3 * val_thk, # leave at 3 * !!
val_max + 3 * val_thk] # leave at 3 * !!
log.info('Paramesh y limits: {}'.format(self.para_limits_y))
self.para_limits_x = [self.para_mesh_2d.xmin(),
self.para_mesh_2d.xmax()]
self.para_limits_z = [self.para_mesh_2d.ymin(),
self.para_mesh_2d.ymax()]
self.preserved_edge_centers = None
log.debug('preserve edges = {}'.format(preserve_edges))
if preserve_edges:
# UNTESTED preserve edges!
topreserve = self.para_mesh_2d.boundaryMarkers() > 0
if np.sum(topreserve) >= 0:
# found marker > 0
edge_ids = np.where(self.para_mesh_2d.boundaryMarkers() > 0)[0]
self.preserved_edge_centers = []
for edge_id in edge_ids:
# store edge center in order to find edge again later
self.preserved_edge_centers.append(
np.array(self.para_mesh_2d.boundaryCenters()[edge_id]))
# append triangle boundary
self.has_boundary = False
# pg.show(self._para_mesh_2d)
log.debug('append boundary = {}'.format(append_boundary))
if append_boundary:
# add new boundary with special cell markers to identify orig cells
kwargs['marker'] = -42
# self._para_mesh_2d_with_bounds
self._para_mesh_2d = pg.meshtools.appendTriangleBoundary(
self._para_mesh_2d, **kwargs)
# pg.show(self._para_mesh_2d)
cm = np.array(self._para_mesh_2d.cellMarkers())
self.orig_cells = cm != -42
self.has_boundary = True
para_limits_x = [self.para_mesh_2d.xmin(),
self.para_mesh_2d.xmax()]
log.debug('append boundary: increase x limits: {} -> {}'
.format(self.para_limits_x, para_limits_x))
para_limits_z = [self.para_mesh_2d.ymin(),
self.para_mesh_2d.ymax()]
log.debug('append boundary: increase z limits: {} -> {}'
.format(self.para_limits_z, para_limits_z))
log.debug('new paramesh cell count (with boundary): {}'
.format(self.para_mesh_2d.cellCount()))
# have secondary config
if self.secondary_config is None:
self.createDefaultSecondaryConfig()
if anomaly is not None:
# case 1/4: anomaly given, set paramesh attributes and marker
self.setAnomalies(anomaly=anomaly, sort=sort)
# sets self.secondary_config.sigma_paramesh
if self.loopmesh is not None:
# case 2/4: also have FE mesh, set FE attributes (anomalies)
# and marker
self.updateFEMAnomaly()
else:
# case 3/4: already got paramesh anomalies and got FE mesh,
# also set FE mesh marker and anomlies
if self.loopmesh is not None and\
self.secondary_config.sigma_paramesh != []:
self.updateFEMAnomaly()
# case 4/4: got neither anomalies nor nor FE mesh: call it a day
[docs] def setDipoleMesh(self, mesh, savename='_default_dipole_mesh',
verbose=True):
""" Sets the dipolemesh and saves it under savename.
Parameters
----------
mesh: string or mesh instance
Pygimli mesh instance or file path to pygimli mesh.
savename: string [ None ]
Used savename for mesh, if mesh is already a mesh instance.
verbose: boolean [ False ]
Turn on verbose mode.
"""
if isinstance(mesh, pg.Mesh):
self.dipolemesh = mesh
self.dipole_mesh_name = savename
self.dipolemesh.save(self.dipole_mesh_name)
elif isinstance(mesh, str):
if not mesh.endswith('.bms'):
if not os.path.exists(mesh + '.bms'):
mesh += '_dipole_mesh'
if verbose:
print('load dipole mesh "{}": '.format(mesh), end='',
flush=True)
self.dipolemesh = pg.Mesh(mesh)
if verbose:
print(self.dipolemesh)
self.dipole_mesh_name = mesh
[docs] def setLoopMesh(self, mesh, savename=None):
""" Sets the loopmesh.
Parameters
----------
mesh: string or mesh instance
Pygimli mesh instance or file path to pygimli mesh.
savename: string [ None ]
Used savename for mesh, if mesh is already a mesh instance.
Alternatively a default name is generated with
*getDefaultLoopMeshBaseName*.
"""
if isinstance(mesh, pg.Mesh):
self.loopmesh = mesh
elif isinstance(mesh, str):
log.info('load loop mesh "{}"... '.format(mesh))
self.loopmesh = pg.Mesh(mesh)
savename = mesh
elif isinstance(mesh, PosixPath):
log.info('load loop mesh "{}"... '.format(mesh.as_posix()))
self.loopmesh = pg.Mesh(mesh.as_posix())
savename = mesh.as_posix()
else: # gets coordinate Points
mesh = np.array(mesh, dtype=np.float64)
self.loopmesh = pg.Mesh(3)
shape_in = np.shape(mesh)
if shape_in[0] == 3 and shape_in[1] != 3:
mesh = mesh.T
for pt in mesh:
self.loopmesh.createNode(pg.Pos(pt[0], pt[1], pt[2]))
self.setLoopMeshName(savename=savename)
# reset field if new mesh
# new mesh = other cell and node count (its not a perfect comparison
# but it should do 99.9% of the time)
# A thorough test is not possible.
if self.field is not None:
if self.field.shape[1] != self.loopmesh.nodeCount() and \
self.field.shape[1] != self.loopmesh.cellCount():
self.field = None
# Known issues: switching from .h5 mesh (custem) to .bms mesh (pygimli)
# This test does not detect the difference, however the meshs will have
# different sortings, which has to be corrected for the fields.
# This, for example, is done in prepareSecondaryFieldCalculation
log.debug('set loopmesh: {}'.format(self.loopmesh))
[docs] def setParaMeshMarkerAndVals(self, anomaly=None, sort=True):
""" Handle anomaly vector and marker of the 2d parameter mesh.
Parameters
----------
anomaly: array_like [ None ]
Vector with conductivities in S/m. Expect one entry for each
cell in parameter mesh. If not given, and sort is True an error
is raised.
sort: boolean [ False ]
If True, set the same marker for double values in anomaly vector.
This is for blocky 2d structures, where only a few different
regions are required. Use default False if dealing with smooth
inversion results, for example in a structural coupling.
"""
raise DeprecationWarning('Use setParaMesh2D(..., anom=...) instead')
if anomaly is None:
if sort is True:
raise Exception('Need anomaly vector for sort option.')
else:
self.paramesh_sort = False
anomaly = self._checkAnomaly(anomaly)
self._checkParaMesh()
# reserve marker for air and background
offset = len(self.secondary_config.sigma_ground) + 1
if sort:
self.paramesh_sort = True
# set only marker for unique resistivity values
# This way marker and resistivities are still matching!
unique1, anom_idx, marks = np.unique(
anomaly.round(decimals=4), return_inverse=True,
return_index=True)
self.anom_idx = anom_idx
log.debug(f'unique old: {unique1}')
unique = anomaly[self.anom_idx]
log.debug(f'unique: {unique}')
self.secondary_config.setAnomalies(unique)
log.info('sort: ({} -> {} marker)'.format(
self.para_mesh_2d.cellCount(), len(unique)))
log.info('sec config anomaly: {} ({})'.format(
unique, len(unique)))
else:
self.anom_idx = None
# set one marker per entry in anomaly, no matter the value
marks = np.arange(self.para_mesh_2d.cellCount())
if anomaly is not None:
# for calculation
self.secondary_config.setAnomalies(anomaly)
log.info('sec config anomaly: {} ({})'.format(
anomaly, len(anomaly)))
# set marker for transfer to FEM Mesh and attributes for debug
self.para_mesh_2d.setCellMarkers(marks + offset)
log.debug('paramesh2d marker count after setParaMeshMarkerAndVals: {}'
.format(len(np.unique(self.para_mesh_2d.cellMarkers()))))
[docs] def setAnomalies(self, anomaly, sort=True):
""" Handle anomaly vector and marker of the 2d parameter mesh.
Parameters
----------
anomaly: array_like [ None ]
Vector with conductivities in S/m. Expect one entry for each
cell in parameter mesh.
sort: boolean [ False ]
If True, set the same marker for double values in anomaly vector.
This is for blocky 2d structures, where only a few different
regions are required. Use default False if dealing with smooth
inversion results, for example in a structural coupling.
"""
# import check
self._checkParaMesh()
# store optional input parameter
self.paramesh_sort = sort
# some more checks
if anomaly is None:
raise Exception('Need anomaly vector with length of paramesh '
'cell count.')
if len(anomaly) != self.orig_cell_count_2d:
raise Exception('Dimension mismatch between original paramesh '
'({} cells) and anomaly vector ({} values).'
.format(self.orig_cell_count_2d, len(anomaly)))
# store raw values in sec config
self.secondary_config.sigma_raw = np.array(anomaly).tolist()
# prepare anomaly vector with respect to parameter mesh prolongation
if self.has_boundary is False:
anom = np.array(self.secondary_config.sigma_raw)
else:
anom = np.zeros(self.para_mesh_2d.cellCount())
anom[self.orig_cells] = anomaly
anom = pg.Vector(anom)
# inplace prolongation needs reference
# therefore explicit pg.Vector call is needed before giving anom
# to prolongateEmptyCellsValues
self.para_mesh_2d.prolongateEmptyCellsValues(anom)
log.debug('setAnomalies: detect boundary around paramesh: '
'prolongate empty cells values.')
with temporal_printoptions():
log.debug('setAnomalies: prolonged anomaly: {} ({})'
.format(np.array(anom), len(anom)))
# set cell attributes, this will help some cache stuff in the future
self.para_mesh_2d.setCellAttributes(anom)
# pygimli to numpy
anom = np.array(anom)
# reserve marker for air and background
offset = len(self.secondary_config.sigma_ground) + 1
# set one marker per entry in anomaly as all values are considered
# unique, regardless of the actual value
# Note: need continuous array with 0 being air, 1...n for n layers
# of the background field and n+1...n+1+m for m anomalies.
# theres no air here, however the marker is still reserved as are
# the markers from 1...n hence *offset* has been introduced.
if self.paramesh_sort:
# sorted cells: anomaly vector has unique values
# self.anom_idx -> from 2d mesh with bounds to unique vals
# This way marker and resistivities are still matching!
unique1, anom_idx, marks = np.unique(
anom.round(decimals=4), return_inverse=True,
return_index=True)
self.anom_idx = anom_idx
log.debug(f'unique old: {unique1}')
unique = anom[self.anom_idx]
log.debug(f'unique: {unique}')
log.info('sort: ({} -> {} marker)'.format(
self.para_mesh_2d.cellCount(), len(unique)))
# store in secondary config
self.secondary_config.sigma_paramesh = unique.tolist()
else:
# just take all values as they are
self.anom_idx = None
marks = np.arange(self.para_mesh_2d.cellCount())
# store in secondary config
self.secondary_config.sigma_paramesh = anom.tolist()
with temporal_printoptions():
log.info('setAnomalies: sec config anomaly (paramesh): {} ({})'
.format(
np.array(self.secondary_config.sigma_paramesh),
len(self.secondary_config.sigma_paramesh)))
# set marker for transfer to FEM Mesh and attributes for debug
self.para_mesh_2d.setCellMarkers(marks + offset)
log.debug('paramesh2d marker count after setAnomalies: {}'
.format(len(np.unique(self.para_mesh_2d.cellMarkers()))))
# OPTIONALLY
# [x] set anomalies in secondary config, raw + paramesh
# add marker to FE mesh and sigma_anom
[docs] def setFEMMesh_old(self, mesh, valid_marker=None, savename=None):
""" Sets the FEM mesh as loopmesh and handles the domain markers.
Parameters
----------
mesh: string or mesh instance
Pygimli mesh instance or file path to pygimli mesh.
valid_marker: array_like [ None ]
If None, checks which domains of the 2D mesh are actually
transferred to the 3D FEM mesh.
The markers are saved in the *valid_marker* attribute.
If given, sets vector directly after some checks.
savename: string [ None ]
Useful if multiple loops are using the same mesh (saves diskspace).
Ignored if *mesh* is a string already.
"""
self._checkParaMesh()
if isinstance(mesh, str):
self.setLoopMesh(savename)
else:
self.setLoopMesh(mesh, savename=savename)
self.setFEMMarker(valid_marker=valid_marker)
[docs] def setFEMMesh(self, mesh, valid_marker=None, savename=None):
""" Sets the FEM mesh as loopmesh and handles the domain markers.
Parameters
----------
mesh: string or mesh instance
Pygimli mesh instance or file path to pygimli mesh.
valid_marker: array_like [ None ]
If None, checks which domains of the 2D mesh are actually
transferred to the 3D FEM mesh.
The markers are saved in the *valid_marker* attribute.
If given, sets vector directly after some checks.
savename: string [ None ]
Useful if multiple loops are using the same mesh (saves diskspace).
Ignored if *mesh* is a string already.
Calls *_setFEMMarker* is paramesh has been set.
Furthermore calls *updateFEMAnomaly* if anomaly has been set through
either *setParamesh2D* or *setAnomaly*
Produces error message if valid_marker array is given, but no paramesh
is found
"""
if isinstance(mesh, str):
self.setLoopMesh(savename)
else:
self.setLoopMesh(mesh, savename=savename)
if self._checkParaMesh(raise_it=False):
# if paramesh
self._setFEMMarker(valid_marker=valid_marker)
if self.secondary_config.sigma_paramesh != []:
# if paramesh + if anomaly set before
self.updateFEMAnomaly(set_marker=False)
else:
# if neither paramesh nor anomaly
if valid_marker is not None:
log.error('setFEMMesh: got valid_marker array, however '
'cannot deal with it if no paramesh is set before. '
'Skip settin of FEMesh markers.')
def _setFEMMarker(self, valid_marker=None):
""" Sets and checks the domain marker of the 3D FEM mesh.
Parameters
----------
valid_marker: array_like [ None ]
If None, checks which domains of the 2D mesh are actually
transferred to the 3D FEM mesh.
The markers are saved in the *valid_marker* attribute.
If given, sets vector directly after some checks.
"""
self._checkParaMesh()
old_marker = np.array(self.loopmesh.cellMarkers())
# domains in para mesh
num_para = len(np.unique(self.para_mesh_2d.cellMarkers()))
# domains in fem mesh - air - ground background
num_ground = len(self.secondary_config.sigma_ground)
num_marker = len(np.unique(self.loopmesh.cellMarkers())
) - 1 - num_ground
if valid_marker is not None:
# which of the para domains are mapped to the FEM mesh
if len(valid_marker) != num_para:
raise Exception(
'Mismatch between para mesh domain count ({}) and len'
' of given valid marker array ({}).'
.format(num_para, len(valid_marker)))
num_marker2 = len(np.where(valid_marker)[0])
if num_marker2 != num_marker:
raise Exception('Mismatch between number of valid domains in'
' given marker vector ({}) and number of '
'anomaly regions found in fem mesh ({}).'
.format(num_marker2, num_marker))
else:
# at this point we varified that given marker vector is valid.
self.valid_marker = valid_marker
else:
# set marker for inner area and return not used marker
not_set = setNearestMarkers(self.loopmesh, self.para_mesh_2d,
self.para_limits_y,
fill_air_ground=True)
pmcc = len(self.para_mesh_2d.cellMarkers())
log.info('setFEMMarker(): Tranferred {}/{} parameter mesh '
'markers to FE mesh.'
.format(pmcc - len(not_set), pmcc))
if len(not_set) / pmcc >= 0.25 and pmcc > 100:
log.warning(
'More than 25 % of the values in the parameter mesh will '
'not be transferred to the FE mesh. This either means '
'your parameter mesh is very dense, or your FE mesh is '
'too coarse.')
if len(not_set) != 0:
# searching for not used markers and performing index shift
# reducing sigma_anom accordingly
# index shift: -air, ground
# VERY SLOW!!!!
excl_ids = np.array(not_set) - 1 - num_ground
valid = np.ones(num_para, dtype=bool)
valid[excl_ids] = False
self.valid_marker = valid
new_marker = np.array(self.loopmesh.cellMarkers())
for ind in np.sort(not_set)[::-1]:
new_marker[new_marker > ind] -= 1
self.loopmesh.setCellMarkers(new_marker)
else:
self.valid_marker = np.ones(num_para, dtype=bool)
# Quick check to due to errors in the past. This stays!
import time
timer_start = time.time()
un = np.unique(self.loopmesh.cellMarkers())
for mn in range(np.max(self.loopmesh.cellMarkers()) + 1):
if mn not in un:
raise Exception('Did not found cell with marker {}'
.format(mn))
log.debug('_setMarker(): error test: {:.4f}'
.format(time.time() - timer_start))
if not np.allclose(old_marker, np.array(self.loopmesh.cellMarkers())):
h5path = Path(self.loop_mesh_name).with_suffix('.h5').as_posix()
log.info('_setFEMMarker(): Marker changed, Export .h5 ({})'
.format(h5path))
self.exportFenicsHDF5Mesh(h5path)
[docs] def setFEMMarker_old(self, valid_marker=None):
""" Sets and checks the domain marker of the 3D FEM mesh.
Parameters
----------
valid_marker: array_like [ None ]
If None, checks which domains of the 2D mesh are actually
transferred to the 3D FEM mesh.
The markers are saved in the *valid_marker* attribute.
If given, sets vector directly after some checks.
"""
log.error('Old function has been used. Abort.')
raise Exception('setFEMMarker_old should not be in use anymore.')
self._checkParaMesh()
# domains in para mesh
num_para = len(np.unique(self.para_mesh_2d.cellMarkers()))
# domains in fem mesh - air - ground background
num_ground = len(self.secondary_config.sigma_ground)
num_marker = len(np.unique(self.loopmesh.cellMarkers())
) - 1 - num_ground
if valid_marker is not None:
# which of the para domains are mapped to the FEM mesh
if len(valid_marker) != num_para:
raise Exception(
'Mismatch between para mesh domain count ({}) and len'
' of given valid marker array ({}).'
.format(num_para, len(valid_marker)))
num_marker2 = len(np.where(valid_marker)[0])
if num_marker2 != num_marker:
raise Exception('Mismatch between number of valid domains in'
' given marker vector ({}) and number of '
'anomaly regions found in fem mesh ({}).'
.format(num_marker2, num_marker))
else:
# at this point we varified that given marker vector is valid.
self.valid_marker = valid_marker
else:
# set marker for inner area and return not used marker
not_set = setNearestMarkers(self.loopmesh, self.para_mesh_2d,
self.para_limits_y,
fill_air_ground=True)
pmcc = len(self.para_mesh_2d.cellMarkers())
log.info('setFEMMarker(): Tranferred {}/{} parameter mesh '
'markers to FE mesh.'
.format(pmcc - len(not_set), pmcc))
if pmcc - len(not_set) / pmcc >= 0.25 and pmcc > 100:
log.warning(
'More than 25 % of the values in the parameter mesh will '
'not be transferred to the FE mesh. This either means '
'your parameter mesh is very dense, or your FE mesh is '
'too coarse.')
if len(not_set) != 0:
# searching for not used markers and performing index shift
# reducing sigma_anom accordingly
# index shift: -air, ground
excl_ids = np.array(not_set) - 1 - num_ground
valid = np.ones(num_para, dtype=bool)
valid[excl_ids] = False
self.valid_marker = valid
new_marker = np.array(self.loopmesh.cellMarkers())
for ind in np.sort(not_set)[::-1]:
new_marker[new_marker > ind] -= 1
self.loopmesh.setCellMarkers(new_marker)
else:
self.valid_marker = np.ones(num_para, dtype=bool)
un = np.unique(self.loopmesh.cellMarkers())
# Quick check to due to errors in the past. This stays!
for mn in range(np.max(self.loopmesh.cellMarkers()) + 1):
if mn not in un:
raise Exception('Did not found cell with marker {}'
.format(mn))
[docs] def calculate(self, num_cpu=12, loop_mesh=None, dipole_mesh=None,
interpolate=False, savename=None, cell_center=False,
verbose=False, mode='auto', matrix=False, field_matrix=None,
max_node_count=None, **kwargs):
""" Computation of the loop field with respect to the config.
Parameters
----------
num_cpu: integer [ 12 ]
Maximum number of processes allowed for this task.
loop_mesh: string or mesh instance [ None ]
Optional. Possibility to give a user defined mesh for the
calculation.
dipole_mesh: string or mesh instance [ None ]
Optional. Possibility to give a user defined mesh for the
calculation (interpolate=True or matrix=True only).
interpolate: boolean [ True ]
The loop dipoles can either be calculated directly (False) or once
on a seperated mesh (dipolemesh) and then interpolated to the
loopmesh (True). If a dipoleFieldName is given, this field will be
used for the interpolation.
savename: string [ None ]
Optional. If savename is not None, the loop will be saved under
the name defined in savename.
cell_center: boolean [ True ]
A default the field of the loop will be calculated at the cell
center of the mesh cells. This flag allows for calculation at the
mesh nodes. Affects only the definition of the final loopmesh, the
dipolemesh will always be calculated at the nodes for interpolation
reasons.
verbose: boolean [ False ]
Turn on verbose mode.
mode: string [ 'auto' ]
Five posibilities: 'auto', 'config', 'te', 'tm', 'tetm'\n
'auto': Automatic detection wether the loop is grounded or not.
Grounded wires are calculated with te and tm mode (see HED).
Non grounded wires are calculated with te mode only (sufficient).\n
'config': the default config decides the mode the field is
calculated in.\n
'te', 'tm', 'tetm': Calculates the field in the choosen mode.
matrix: boolean [ False ]
Alternatively calculation approach. At first the field on a highly
dense dipole mesh will be triggered. After that the field will
be interpolated to the single dipole positions by the means of a
matrix vector multiplication with a matrix containing appropriate
weighting factors. This Approach takes longer than direct
calculation in the first run, but the calculated matrix can be used
for further calculations with different frequencies or resistivity
models (as long as the loopmesh and dipolemesh remain the same).
field_matrix: list or string [ None ]
Interpolation matrices or file path if calculation with
matrix=True. Will be calculated automatically if None.
max_node_count: integer [ None ]
As all points will be calculated at once, the computational effort
scales lineary with the reciever count, the transmitter count and
the used hankel factors. If the limits of the available memory is
reached *max_node_count* can be used to define the maximum chunk
of nodes to be computated at once. Other nodes will be computed
afterwards.
Keyword arguments
-----------------
Keyword arguments are redirected to loop.save and to define
the *drop_tol* (float [ 1e-2 ]) in the cylindrical coordinate
transformation to avoid instabilities around the source.
"""
tick = Timer()
# load/import dipole Mesh
if dipole_mesh is not None:
self.setDipoleMesh(dipole_mesh)
# load/import loop Mesh
if loop_mesh is not None:
self.setLoopMesh(loop_mesh)
# create loop mesh if still missing
if self.loopmesh is None:
log.info('No Loop Mesh given or found. Creating.')
self.createLoopMesh(savename=savename)
log.log(16, 'loopmesh: {!s}'.format(self.loopmesh))
# transform coordinates, and remember to change them back again
# only in case where loopmesh is manually set numpy array (custEM)
transposed = False
if not isinstance(self.loopmesh, pg.Mesh):
if len(self.loopmesh) != 3 and len(self.loopmesh[0]) == 3:
self.loopmesh = self.loopmesh.T
transposed = True
# one dipole only case
if self.type == 'dipole':
interpolate = False # just one dipole: always calculate directly
# decide wether tm, te or both
if mode == 'auto':
if self.grounded is False:
log.info('detecting non-grounded loop, set mode to "te"')
self.config.mode = 'te'
elif self.grounded is True:
log.info('detecting grounded loop, set mode to "tetm"')
self.config.mode = 'tetm'
elif mode == 'config':
pass
else:
self.config.mode = mode
# calculation on nodes or cell midpoints
self.field_on_nodes = not cell_center # historic
log.log(16, 'cell center: {}'.format(cell_center))
if matrix is True or interpolate is True:
if self.dipolemesh is None:
log.info('No Dipole Mesh given or found. Creating.')
self.createDipoleMesh(savename=savename)
log.log(16, 'dipolemesh: {}'.format(self.dipolemesh))
# dipole calcualation
log.info(f'Calculating dipole field of {self!r}')
log.info(f'{self.config!r}')
log.debug(f'{self.config!s}')
self.calculateDipoleField(
num_cpu=num_cpu,
max_node_count=max_node_count,
verbose=verbose,
drop_tol=kwargs.get('drop_tol', 1e-2))
if matrix is False:
# case 1/3: interpolate
# dipolemesh created out of loopmesh
if interpolate is True:
# interpolation
self.field = loopInterpolation(self.d_field,
self.dipole_mesh_name,
self.loopmesh,
(self.pos, self.phi, self.ds),
verbose=verbose,
cell_center=cell_center,
num_cpu=num_cpu)
# case 2/3: calculate directly
elif interpolate is False:
log.info(f'Calculating loop field of {self!r}')
log.info(f'{self.config!r}')
log.debug(f'{self.config!s}')
self.field = loopCalculation(self.loopmesh,
(self.pos, self.phi, self.ds),
self.config.rho,
self.config.d,
self.config.f,
self.config.current,
self.config.mode,
self.config.ftype,
verbose=verbose,
cell_center=cell_center,
num_cpu=num_cpu,
max_node_count=max_node_count,
**kwargs) # drop_tol
if transposed is True:
self.loopmesh = self.loopmesh.T
self.field = self.field.T
else:
# case 3/3: calculation via matrix
if self.field_matrix is None:
if field_matrix is None:
self.calculateFieldMatrix(num_cpu=num_cpu,
verbose=verbose)
elif isinstance(field_matrix, (tuple, list)):
self.field_matrix = field_matrix[0]
self.field_matrix_sin = field_matrix[1]
self.field_matrix_cos = field_matrix[2]
else:
self.loadFieldMatrix(field_matrix)
self.calculateFieldFromMatrix(verbose=verbose)
log.info('total field calculation time: {:2.2f} seconds'
.format(tick.time_total))
if savename is not None:
self.save(savename, kwargs.pop('config_savename', None))
return self.field
[docs] def calculateInterpolationMatrix(self, Pos):
""" Calculates the interpolation matrix.
If one wished the field can be interpolated to another mesh.
The interpolation matrix from the loopmesh to an arbitrary set of
coordinates is calculated with this function. This function is called
to initialize and append the weights to the interpolation matrix.
Note: The loop class does not hold a reference of the resulting matrix,
instead gives it back to the caller.
Parameters
----------
Pos: np.ndarray or pg.core.PosVector
Transmitter positions of shape (n, 3) with n positions. Values
are expected to be floats (the conversion to a pg.PosVector
will not check again).
Returns
-------
mat: pg.core.SparseMapMatrix
Sparse interpolation matrix with number of columns equal to the
number of nodes in the loopmesh and number of rows equal to
the number of input positions.
"""
tick_sIM = time.time()
log.log(16, 'Loop: Assembling InterpolationMatrix.')
if not isinstance(Pos, pg.core.PosVector):
Pos = pg.core.PosVector(Pos)
interpolationMatrix = self.loopmesh.interpolationMatrix(Pos)
log.log(13, '%2.2f sec' % (time.time() - tick_sIM))
return interpolationMatrix
# def calculateInterpolationMatrix_para(self, Pos, num_cpu=12):
# """ Parallel version of *calculateInterpolationMatrix*.
#
# Not working yet.
# """
# raise Exception('not working yet!')
# tick_sIM = time.time()
# if verbose:
# print('Assembling InterpolationMatrix...', end='', flush=True)
# if not isinstance(Pos, pg.core.PosVector):
# Pos = pg.core.PosVector(Pos)
# if verbose:
# print(' {} to {} points'.format(self.loopmesh.nodeCount(),
# len(Pos)), end='', flush=True)
# self.loopmesh.save('mesh_for_interpolation.bms')
# interpolationMatrix = InterpolationMatrix_para(
# 'mesh_for_interpolation.bms',
# Pos,
# num_cpu=num_cpu,
# in_node_count=self.loopmesh.nodeCount(),
# verbose=True)
# if verbose:
# print(' %2.2f sec' %
# (time.time() - tick_sIM), flush=True)
# return interpolationMatrix
[docs] def calculateFieldMatrix(self, num_cpu=8, verbose=False):
"""
If wished the calcualtion of the total loop field can be done by
interpolation and superposition of one highly accurate dipole field
to the different transmitter positions of the loop. This is done either
done directly or via a vector matrix multiplication.
This function is called to initialize and append the weights to the
interpolation matrix from the *dipolemesh* to the *loopmesh* for all
tx positions with respect to *pos*, *phi*, and *ds*.
This function will be called if *calculate* is called with
*matrix=True*.
Parameters
----------
num_cpu: integer [ 8 ]
Define the maximum number of cores allowed for this operation.
verbose: boolean [ False ]
Turn on verbose mode.
"""
if num_cpu == 1:
outPos = R3VtoNumpy(self.loopmesh.positions())
self.field_matrix = pg.core.SparseMapMatrix()
self.field_matrix.resize(self.loopmesh.nodeCount(),
self.dipolemesh.nodeCount())
self.field_matrix_cos = pg.core.SparseMapMatrix()
self.field_matrix_cos.resize(self.loopmesh.nodeCount(),
self.dipolemesh.nodeCount())
self.field_matrix_sin = pg.core.SparseMapMatrix()
self.field_matrix_sin.resize(self.loopmesh.nodeCount(),
self.dipolemesh.nodeCount())
for i, p in enumerate(self.pos):
actualPos = vec.translate(outPos, -p[0], -p[1], z=-p[2],
copy=True)
vec.rotate3(actualPos, -self.phi[i])
new = self.dipolemesh.interpolationMatrix(
pg.core.PosVector(actualPos)) * self.ds[i]
self.field_matrix += new
self.field_matrix_sin += new * np.sin(-self.phi[i])
self.field_matrix_cos += new * np.cos(-self.phi[i])
else:
self.field_matrix,\
self.field_matrix_sin,\
self.field_matrix_cos = \
calcFieldMatrix_para(self.dipole_mesh_name,
self.dipolemesh.nodeCount(),
self.loopmesh,
(self.pos, self.phi, self.ds),
verbose=verbose,
num_cpu=num_cpu)
[docs] def calculateFieldFromMatrix(self):
"""
Calculates the primary field on basis of the interpolation matrix
and the dipole field.
"""
if self.field_matrix is None:
self.calculateFieldMatrix()
self.field = vec.interpolateField_rotatedMatrix(
self.d_field,
base_mat=self.field_matrix,
sin_mat=self.field_matrix_sin,
cos_mat=self.field_matrix_cos)
return self.field
[docs] def calculateDipoleField(self, verbose=False, drop_tol=1e-2,
num_cpu=12, max_node_count=None):
""" Calculates field on dipole mesh.
Parameters
----------
verbose: boolean [ False ]
Turn on verbose mode.
drop_tol: float [ 1e-2 ]
Singularity fix. All horizontal distances between *drop_tol*
and the transmitter dipole are placed between the first reciever
outside the tolerance and the tolerance, maintaining the correct
order and angle.
num_cpu: integer [ 12 ]
Maximum number of processes allowed for this task.
max_node_count: integer [ None ]
As all points will be calculated at once, the computational effort
scales lineary with the reciever count, the transmitter count and
the used hankel factors. If the limits of the available memory is
reached *max_node_count* can be used to define the maximum chunk
of nodes to be computated at once. Other nodes will be computed
afterwards.
"""
self.d_field = loopCalculation(self.dipolemesh,
([(0, 0, 0)], [0], [1]), # x-HED
self.config.rho,
self.config.d,
self.config.f,
self.config.current,
self.config.mode,
self.config.ftype,
verbose=verbose,
cell_center=False,
num_cpu=num_cpu,
max_node_count=max_node_count,
drop_tol=drop_tol) # drop_tol
[docs] def calcAndExportFieldsForFenics(self, export_vtk=False, num_cpu=32,
**kwargs):
"""
Calculates and export primary fields for fenics secondary field
calculation.
Parameters
----------
kwargs: dict
Keyword parameters are redirected to *calculate*.
"""
import dolfin as df
if self.secMOD is None:
log.info('initializing custEM instance')
self.initCustEM(init_primary_field_class=True)
orig_ftype = '{}'.format(self.config.ftype) # copy string is tricky
magnetic = True
electric = False
if self.secondary_config.pf_EH_flag == 'E' or\
kwargs.get('debug', False):
electric = True
log.debug('initialize mesh (vector function space) from custEM')
self.secMOD.FE.FS.pf_p = 1
V_cg = df.VectorFunctionSpace(self.secMOD.FE.FS.mesh, 'CG', 1)
df_c = V_cg.tabulate_dof_coordinates().reshape(
-1, 3)[0::3, :]
pg_c = self.loopmesh.positions().array()
pg_df, df_pg = convertCoordinates(pg_c, df_c)
log.debug('primary calculation: coords: {}'.format(
np.allclose(df_c, pg_c)))
log.debug('primary calculation: check sorting: {}, {}'.format(
np.allclose(df_c[df_pg], pg_c),
np.allclose(df_c, pg_c[pg_df])))
if not np.allclose(df_c[df_pg], pg_c) or\
not np.allclose(df_c, pg_c[pg_df]):
raise Exception('Something went wrong with the coordinate '
'transformation between dolfin and pygimli mesh. '
'please check input files.')
self.setModel(self.secondary_config.sigma_ground,
d=None, resistivity=False)
if magnetic:
self.config.ftype = 'H'
log.info('calculate magnetic primary fields')
self.calculate(num_cpu=num_cpu, **kwargs)
h_field = np.copy(self.field)
self.exportVTK('hfield_debug.vtk')
else:
h_field = np.zeros_like(self.loopmesh.positions().array(),
dtype=np.complex)
self.secMOD.FE.FS.PF.pvd_flag = export_vtk
h_field_fenics = h_field[:, pg_df].T
assert np.shape(h_field_fenics)[1] == 3
# setting H fields manually in preparation for export
self.secMOD.FE.FS.PF.H_0 = [h_field_fenics]
# calc E primary
if electric:
self.config.ftype = 'E'
log.info('calculate electric primary fields')
self.calculate(num_cpu=num_cpu, **kwargs)
e_field = np.copy(self.field)
self.exportVTK('efield_debug.vtk')
else:
e_field = np.zeros_like(self.loopmesh.positions().array(),
dtype=np.complex)
# setting E fields manually in preparation for export
e_field_fenics = e_field[:, pg_df].T
assert np.shape(h_field_fenics)[1] == 3
self.secMOD.FE.FS.PF.E_0 = [e_field_fenics]
# only one export
self.secMOD.FE.FS.PF.n_tx = 1 # fixed to one transmitter right now
self.secMOD.FE.FS.PF.export_primary_fields()
# reset field type to whatever we were interested in to begin with
self.config.ftype = orig_ftype
if self.config.ftype == 'E':
self.field = e_field
elif self.config.ftype == 'H':
self.field = h_field
elif self.config.ftype == 'B':
self.field = h_field * 4e-7 * np.pi
else:
self.field = None
[docs] def calculateSecField(self, num_cpu=8, **kwargs):
""" Calculates the secondary field using custEM.
Calculates primary field as well if not found.\n
Needs a FEM suited mesh as well as a parameter distribution provided
by other functions of this class (See *createFEMMesh* and
*prepareSecondaryFieldCalculation*).
Parameters
----------
num_cpu: integer [ 8 ]
Maximum number of processes allowed for this task. The actual
calculation will be done in an mpirun environment with the
selected number of cores.
kwargs: dict
Keyword arguments are redirected to *local_apps*.
"""
if self.secondary_config is None:
raise Exception('No config for secondary field calculation has '
'been loaded.')
log.info('Start secondary field calculation routine.')
if not self._checkPrimaryFields():
log.info('preparing primary fields...')
self.calcAndExportFieldsForFenics(num_cpu=num_cpu)
self.save(savename=self.name, save_mesh=False)
success = local_apps_bash('secondaryFieldCalculation',
self.name,
str(num_cpu),
**kwargs)
if success != 0:
raise Exception('Error in secondary field calculation.')
else:
log.info('load resulting fields: "{}"'
.format('{}_total.npy'.format(self.name)))
self.field = np.load('{}_total.npy'.format(self.name))
if '_s' in self.secondary_config.approach:
self.secondary = np.load('{}_secondary.npy'.format(self.name))
else:
self.secondary = None
if self.config.ftype.lower() == 'b':
# custEM produces H fields.
self.field *= 4e-7 * np.pi
if self.secondary is not None:
self.secondary *= 4e-7 * np.pi
[docs] def load(self, savename=None, config=None, config2=None,
verbose=True, load_meshes=True, overwrite_dir=False):
""" Load Loop from files.
Parameters
----------
savename: string [ None ]
Basename of the lop class files. Other names are autogenerated
using this basename.
config: string [ None ]
Tell the load function to explicitely load config from given path.
Else the saved filepath in the main archive is used.
config2: string [ None ]
See *config*, but for secondary configuration.
verbose: boolean [ True ]
Turn on verbose mode.
load_meshes: boolean [ True ]
If originally saved, the meshes are loaded by default. However,
this takes more time then the rest of the load function and can be
ommitted if only the other parts are of interest.
"""
self.verbose = verbose
if savename is not None:
self.name = savename
if self.name is None:
raise Exception('Need savename to load loop instance.')
filename = os.path.split(savename)[1].split('.')[0]
name = os.path.join(os.path.dirname(self.name), filename)
log.info('load loop: {}'.format(os.path.abspath(name + '.npz')))
with np.load(name + '.npz', allow_pickle=True) as archive:
# load geometry
if 'ltype' in archive:
# new npz style
log.debug('load geometry from same npz.')
self.type = str(archive['ltype'])
self.grounded = bool(archive['grounded'])
self.pos = archive['pos']
self.phi = archive['phi']
self.ds = archive['ds']
self._setDistance()
else:
# old npz style
log.debug('load geometry from separate npz.')
self._loadLoopGeometry(name + archive['save_geometry'].item())
# attempt to load mesh
if archive['save_mesh'].item() is not None:
self.loop_mesh_name = archive['save_mesh'].item()
if load_meshes:
if os.path.exists(self.loop_mesh_name):
self.setLoopMesh(self.loop_mesh_name)
else:
# RuntimeError from c++: ignore, warn, and continue
log.warning('Did not found loopmesh at "{}"'
.format(self.loop_mesh_name))
# attempt to load dipole mesh
if archive['save_dipole_mesh'].item() is not None:
self.dipole_mesh_name = archive['save_dipole_mesh'].item()
if load_meshes:
if os.path.exists(self.dipole_mesh_name):
self.setDipoleMesh(self.dipole_mesh_name,
verbose=verbose)
else:
# RuntimeError from c++: ignore, warn, and continue
log.warning('Did not found dipolemesh at "{}"'
.format(self.loop_mesh_name))
self.field = getItem(archive, 'save_field')
self.d_field = getItem(archive, 'save_d_field')
self.idx = getItem(archive, 'save_idx')
self.area = getItem(archive, 'save_area', 0.)
self.turns = getItem(archive, 'save_turns', 1)
# config
if config is None:
config = archive['save_config'].item()
if config == '.cfg':
config = name + config
self.config.load(config)
try:
if config2 is None:
config2 = archive['save_config2'].item()
if config2 == '_sec.cfg':
config2 = name + config2
if config2 is not None:
# given or created, either way, continue with import
# sets self.cfg2_savename
# loads self.secondary_config
if overwrite_dir:
from pathlib import Path
config2 = Path(
self.name).parent.joinpath(
Path(config2).name).as_posix()
self.loadSecondaryConfig(config2)
except (KeyError, TypeError):
# old versions of loop classes doesnt have secondary configs
pass
if os.path.exists('{}_secondary.npy'.format(self.name)):
log.log(16, 'load secondary field.')
self.secondary = np.load('{}_secondary.npy'.format(self.name))
return True # success
[docs] def loadFieldMatrix(self, name, verbose=True):
"""
Loads the three matrices needed for recalculation of the primary
field from numpy archive. See saevFieldmatrix for detailed description.
Parameters
----------
name: string
Path to file to be loaded.
verbose: boolean [ True ]
Turn on verbose mode.
"""
# from comet.pyhed.misc import loadSparseMatrixFromNumpyArchive
log.info('Loading fieldMatrices from "{}"'
.format(os.path.abspath(name)))
# matrices = loadSparseMatrixFromNumpyArchive(name)
# self.field_matrix = matrices[0]
# self.field_matrix_sin = matrices[1]
# self.field_matrix_cos = matrices[2]
self.field_matrix = pg.core.SparseMapMatrix()
self.field_matrix_sin = pg.core.SparseMapMatrix()
self.field_matrix_cos = pg.core.SparseMapMatrix()
self.field_matrix.load('{}_field_matrix.bmat'.format(name))
self.field_matrix_sin.load('{}_field_matrix_sin.bmat'.format(name))
self.field_matrix_cos.load('{}_field_matrix_cos.bmat'.format(name))
[docs] def loadSecondaryConfig(self, savename=None):
""" Imports previously saved secondary config.
Parameters
----------
savename: string [ None ]
Used savename over *loop.sec_savename*. Throws Exception if both
values are None. Replaces *loop.sec_savename*.
"""
if savename is not None:
self.cfg2_savename = savename
if self.cfg2_savename is None:
raise Exception('Need savename to load secondary configuration.')
self.secondary_config = SecondaryConfig(name=self.cfg2_savename)
[docs] def createLoopMesh(self, savename=None, exportVTK=False, airspace=False,
verbose=False, xmax=None, xmin=None,
ymax=None, ymin=None, zmin=None):
"""
Builds the mesh where the loop will be calculated in.
Parameter
---------
savename: string [ None ]
Saves the created mesh under savename, as long as savenmae is not
none. If no basename is given, the dafaultname will be
'_default_LoopMesh' + looptype + number of dipoles + '.bms'.
exportVTK: boolean [ False ]
Switch to export the resulting mesh to a vtk with the given
savename.
airspace: boolean [ False ]
Enables airspace.
verbose: boolean [ False ]
Turn on berbose mode.
"""
log.info('createLoopMesh()')
if savename is None:
loopbasename = self.getDefaultLoopMeshBaseName()
else:
loopbasename = savename
self.loop_mesh_name = loopbasename + '.bms'
# compute outer limits for loopmesh
minx = np.min(self.pos.T[0]) - 3 * self._maxdistance
maxx = np.max(self.pos.T[0]) + 3 * self._maxdistance
miny = np.min(self.pos.T[1]) - 3 * self._maxdistance
maxy = np.max(self.pos.T[1]) + 3 * self._maxdistance
minz = np.min(self.pos.T[2]) - 3 * self._maxdistance
if xmin is not None:
minx = min(xmin, minx)
if ymin is not None:
miny = min(ymin, miny)
if zmin is not None:
minz = min(zmin, minz)
if xmax is not None:
maxx = max(xmax, maxx)
if ymax is not None:
maxy = max(ymax, maxy)
if airspace:
maxz = np.max(self.pos.T[2]) + 1 * self._maxdistance
else:
maxz = np.max(self.pos.T[2])
log.debug('X, Y, Z: ({:.2f}, {:.2f}), ({:.2f}, {:.2f}), ({:.2f}, '
'{:.2f})'.format(minx, maxx, miny, maxy, minz, maxz))
if len(self.pos) == 1: # just one dipole
# 10 * 10 unit cube around the dipole
minx = -10. + self.pos[0][0]
maxx = 10. + self.pos[0][0]
miny = -10. + self.pos[0][1]
maxy = 10. + self.pos[0][1]
minz = -10. + self.pos[0][2]
if airspace:
maxz = 10. + self.pos[0][2]
else:
maxz = self.pos[0][2]
self.ds = [1.0]
# create loopmesh (nodes, faces), then apply tetgen
# corner points for the halfspace fringe
loopmesh = pg.Mesh(3)
ob8 = loopmesh.createNode(minx, miny, 0)
ob9 = loopmesh.createNode(maxx, miny, 0)
ob10 = loopmesh.createNode(maxx, maxy, 0)
ob11 = loopmesh.createNode(minx, maxy, 0)
# halfspace face
loopmesh.createQuadrangleFace(ob8, ob9, ob10, ob11)
# lower outer boundary nodes, earthspace
ob0 = loopmesh.createNode(minx, miny, minz)
ob1 = loopmesh.createNode(maxx, miny, minz)
ob2 = loopmesh.createNode(maxx, maxy, minz)
ob3 = loopmesh.createNode(minx, maxy, minz)
# earth side
loopmesh.createQuadrangleFace(ob8, ob9, ob1, ob0)
loopmesh.createQuadrangleFace(ob9, ob10, ob2, ob1)
loopmesh.createQuadrangleFace(ob10, ob11, ob3, ob2)
loopmesh.createQuadrangleFace(ob11, ob8, ob0, ob3)
# earth bottom
loopmesh.createQuadrangleFace(ob0, ob1, ob2, ob3)
if maxz > 0:
# upper outer boundary nodes, airspace
ob4 = loopmesh.createNode(minx, miny, maxz)
ob5 = loopmesh.createNode(maxx, miny, maxz)
ob6 = loopmesh.createNode(maxx, maxy, maxz)
ob7 = loopmesh.createNode(minx, maxy, maxz)
# air side
loopmesh.createQuadrangleFace(ob8, ob9, ob5, ob4)
loopmesh.createQuadrangleFace(ob9, ob10, ob6, ob5)
loopmesh.createQuadrangleFace(ob10, ob11, ob7, ob6)
loopmesh.createQuadrangleFace(ob11, ob8, ob4, ob7)
# air top
loopmesh.createQuadrangleFace(ob4, ob5, ob6, ob7)
# append refinement points
for i, posi in enumerate(self.pos):
# log.debug([posi[0], posi[1], posi[2] -
# self._mindistance / self.refinement_para * \
# len(self.pos)])
loopmesh.createNode(
posi[0], posi[1], posi[2] -
self._mindistance / self.refinement_para * len(self.pos))
tet_maxArea = self._maxdistance**3 * self.max_area_para
log.debug('using global loopmesh volume constraint: {:.3f} m³, '
'({:.3f} m)'.format(tet_maxArea, np.cbrt(tet_maxArea)))
pg.meshtools.exportPLC(loopmesh, loopbasename + '.poly')
loopmesh.exportVTK(loopbasename + '_poly.vtk')
# tetgen call
tetgen151(loopbasename, maxArea=tet_maxArea,
quality=self.tetgen_quality)
try:
self.loopmesh = pg.Mesh(loopbasename + '.1.vtk')
except RuntimeError: # c++ for file not found in this case
self.loopmesh = pg.meshtools.readTetgen(loopbasename + '.1')
self.loopmesh.save(self.loop_mesh_name) # for kernel call (piping)
log.info('loopmesh.save: {}'.format(self.loop_mesh_name))
cleanUpTetgenFiles(loopbasename)
return self.loopmesh
[docs] def createDipoleMesh(self, quadratic=True,
savename='_default_dipole_mesh.bms', save=False,
verbose=False):
""" Creates a suitable dipole mesh for calculation via a single dipole.
Parameters
----------
quadratic: boolean [ True ]
If chosen, uses a quadratic (2nd order) mesh for dipole
calculation.
savename: string [ '_default_dipole_mesh.bms' ]
Define output name.
save: boolean [ True ]
Additional save of dipole mesh under *savename*.
verbose: boolean [ False ]
Turn on verbose mode.
"""
self.dipole_mesh_name = savename
if self.dipole_mesh_name is None:
self.dipole_mesh_name = '_default_dipole_mesh.bms'
if self.loopmesh is None:
self.createLoopMesh()
if isinstance(self.loopmesh, pg.Mesh):
lp = R3VtoNumpy(self.loopmesh.positions()).T
else:
lp = self.loopmesh
if self.dipole_mesh_name[-4:] != '.bms':
self.dipole_mesh_name += '.bms'
# outer boundaries
factor = np.sqrt(2.2) # sqareroot of 2 would be enough
difx = np.max(lp[0]) - np.min(lp[0])
dify = np.max(lp[1]) - np.min(lp[1])
loopDim = max((difx, dify)) * factor / 2
maxDim = loopDim + self._maxdistance
minz = np.min(lp[2])
maxz = np.max(lp[2])
reff = self._mindistance / self.refinement_para * len(self.pos)
if minz > (-2 * reff): # to make place for proper refinement points
minz = -2 * reff
if verbose:
print('automatically set minz to %f' % (-2 * reff))
# print(maxDim, minz, maxz)
# create loopmesh (nodes, faces), then apply tetgen
dipolemesh = pg.Mesh(3)
# outer boundary nodes, bottom
ob0 = dipolemesh.createNode(-maxDim, -maxDim, minz)
ob1 = dipolemesh.createNode(maxDim, -maxDim, minz)
ob2 = dipolemesh.createNode(maxDim, maxDim, minz)
ob3 = dipolemesh.createNode(-maxDim, maxDim, minz)
# outer boundary nodes, halfspace face
ob8 = dipolemesh.createNode(-maxDim, -maxDim, 0)
ob9 = dipolemesh.createNode(maxDim, -maxDim, 0)
ob10 = dipolemesh.createNode(maxDim, maxDim, 0)
ob11 = dipolemesh.createNode(-maxDim, maxDim, 0)
# outer boundary nodes, top
if maxz > 0:
ob4 = dipolemesh.createNode(-maxDim, -maxDim, maxz)
ob5 = dipolemesh.createNode(maxDim, -maxDim, maxz)
ob6 = dipolemesh.createNode(maxDim, maxDim, maxz)
ob7 = dipolemesh.createNode(-maxDim, maxDim, maxz)
# air side
dipolemesh.createQuadrangleFace(ob8, ob9, ob5, ob4)
dipolemesh.createQuadrangleFace(ob9, ob10, ob6, ob5)
dipolemesh.createQuadrangleFace(ob10, ob11, ob7, ob6)
dipolemesh.createQuadrangleFace(ob11, ob8, ob4, ob7)
# air top
dipolemesh.createQuadrangleFace(ob4, ob5, ob6, ob7)
# earth side
dipolemesh.createQuadrangleFace(ob8, ob9, ob1, ob0)
dipolemesh.createQuadrangleFace(ob9, ob10, ob2, ob1)
dipolemesh.createQuadrangleFace(ob10, ob11, ob3, ob2)
dipolemesh.createQuadrangleFace(ob11, ob8, ob0, ob3)
# earth bottom
dipolemesh.createQuadrangleFace(ob0, ob1, ob2, ob3)
# halfspace face
dipolemesh.createQuadrangleFace(ob8, ob9, ob10, ob11)
maxArea = self._maxdistance**3 * self.max_area_para
if quadratic:
maxArea *= 8.0
# self.dipolemesh = mesh
dipolemesh.createNode(0., 0., self.pos[0][2] - reff)
pg.meshtools.exportPLC(dipolemesh,
self.dipole_mesh_name.rstrip('.bms') + '.poly')
# make tetgen mesh
tetgen151(self.dipole_mesh_name.rstrip('.bms'), maxArea=maxArea,
quality=self.tetgen_quality, verbose=verbose)
self.dipolemesh = pg.Mesh(self.dipole_mesh_name.rstrip('.bms') +
'.1.vtk')
if quadratic:
self.dipolemesh = self.dipolemesh.createP2()
os.remove(self.dipole_mesh_name.rstrip('.bms') + '.1.vtk')
os.remove(self.dipole_mesh_name.rstrip('.bms') + '.poly')
self.dipolemesh.save(self.dipole_mesh_name) # for kernel call (piping)
cleanUpTetgenFiles(self.dipole_mesh_name)
return self.dipolemesh
[docs] def createFEMMesh(self, para_mesh_2d=None,
savename=None, exportVTK=False, exportH5=True,
box_x=[None, None],
box_y=[None, None],
box_z=None,
box_cell_size=None,
source_poly=None,
source_setup='edges',
source_loops=None,
inner_area_cell_size=0.3,
outer_area_cell_size=10,
subsurface_cell_size=None,
poly_2d=None,
number_of_loops=None,
**kwargs):
"""
Builds the FEM mesh for the secondary field computation.
Needs at least on of the two possible parameter meshes in order to
continue.
Parameter
---------
para_mesh_2d: string or pg.Mesh [ None ]
Used to get the outer dimensions of the FEMMesh.
savename: string [ None ]
Define output save name of FEM mesh. Default name will be generated
if None. If no savename is given, the dafaultname will be
'_default_LoopMesh' + looptype + number of dipoles.
exportVTK: boolean [ False ]
Turn on optional vtk export.
source_setup: string [ 'edges' ]
Defines the way the sources are incorporated into the mesh. "nodes"
simply insert the dipole positions (fallback), "edges" defines
strait edges between the nodes (usually the best approach). "etra"
can be used for a special setup where multiple loops are build in
an elongated transmitter with inline receiver array. Raises an
exception if source_setup differs from the three options.
source_loops: list [ None ]
If a list of loop classes is given, their tx representation after
custEM is implemented in the mesh for custEM magnetic field
calculations using automatic source detection.
inner_area_cell_size: float [ 0.3 ]
Maximum allowed area (m²) for all cell in the source plane within
the source polygons (if closed loop). Very important for kernel
calculation! See tutorial for custEM for further explanations.
outer_area_cell_size: float [ 10 ]
Maximum allowed area (m²) for all cells in the source plane outside
the source polygons (or anywhere for not closed loop). See tutorial
for custEM for further explanations.
subsurface_cell_size: float [ None ]
Maximum allowed volume (m³) for all cells within inner mesh box
(not the tetrahedron boundary to 10 km). Optional.
Kwargs
------
limits: list of len 2 [ None ]
Minimum and maximum y value, the anomalies should be set in the
fem mesh. Uses the x limits of the 2D parameter mesh as default if
*None*.
Dependencies
------------
custEM:
Install via conda on Linux only. See install instructions of comet.
"""
from custEM.meshgen.meshgen_tools import BlankWorld
from custEM.meshgen.meshgen_utils import export_tetgen_poly_file
# checking para mesh 2D
if para_mesh_2d is not None:
self.setParaMesh2D(para_mesh_2d, limits=kwargs.pop('limits', None))
self._checkParaMesh()
if self.secondary_config.sigma_paramesh == []:
raise Exception(
'createFEMMesh needs an anomaly vector to work. The '
'anomalies are used to detect and distribute the markers in '
'the FE mesh correctly. Please use setParaMesh2D or '
'setAnomalies in order to set te anomaly vector before '
'calling createFEMMesh.')
# name + dir
test_path = Path(savename)
if test_path.parent != Path('.'):
raise Exception(
'Please enter only a filename as "savename" not a Path. If you'
' want to change the mesh directory please use the '
'secondary_config (e.g. "createDefaultSecondaryConfig(m_dir='
'...) or change the "secondary_config.m_dir" attribute '
'directly.").')
self.setLoopMeshName(savename) # adds m_dir and _h5
mesh_path = Path(self.loop_mesh_name)
log.debug([mesh_path, mesh_path.parent])
mesh_path.parent.mkdir(exist_ok=True, parents=True)
# make new _h5 dir if needed
# basename without suffix or path(string)
mesh_name = mesh_path.stem
# without suffix
mesh_base = mesh_path.with_name(mesh_path.stem).as_posix()
log.info('creating FEM mesh...')
# compute outer limits for FEM mesh
minx = box_x[0]
if minx is None:
minx = self.para_limits_x[0]
maxx = box_x[1]
if maxx is None:
maxx = self.para_limits_x[1]
miny = box_y[0]
if miny is None:
miny = self.para_limits_y[0]
maxy = box_y[1]
if maxy is None:
maxy = self.para_limits_y[1]
minz = box_z
if minz is None:
minz = self.para_limits_z[0]
maxz = 0.5 * abs(minz)
# inner poly and source
# part 1: inner poly with refinement.
log.info('inner mesh box x:, {}, {}'.format(minx, maxx))
log.info('inner mesh box y:, {}, {}'.format(miny, maxy))
log.info('inner mesh box z:, {}, {}'.format(minz, maxz))
# quick check for overlapping polys
intersect = False
if poly_2d is not None:
if poly_2d.xmax() >= maxx:
intersect = True
if poly_2d.xmin() <= minx:
intersect = True
if poly_2d.ymin() <= minz:
intersect = True
if poly_2d.ymax() >= 0:
intersect = True
if intersect:
msg = 'Refinement Box and poly_2d are intersecting. '
msg += 'Box(x, z) = [({:.2f}, {:.2f}), ({:.2f}, {:.2f})] '\
.format(minx, maxx, minz, 0)
msg += 'should be able to contain poly_2d(x, z(or y)) = '
msg += '[({:.2f}, {:.2f}), ({:.2f}, {:.2f})]'\
.format(poly_2d.xmin(), poly_2d.xmax(), poly_2d.ymin(),
poly_2d.ymax())
raise Exception(msg)
blank_world_kwargs = {}
if subsurface_cell_size is not None:
blank_world_kwargs['subsurface_cell_size'] = subsurface_cell_size
poly = BlankWorld(name=mesh_name,
x_dim=[minx, maxx],
y_dim=[miny, maxy],
z_dim=[minz, maxz],
m_dir=self.secondary_config.m_dir,
inner_area_cell_size=inner_area_cell_size,
outer_area_cell_size=outer_area_cell_size,
**blank_world_kwargs)
if source_poly is not None:
if isinstance(source_poly, str):
source_poly = pg.load(source_poly)
add_lines = []
marker_pos = []
max_length = kwargs.pop('max_length', 0.5)
for bound in source_poly.boundaries():
line_start = bound.allNodes()[0].pos().array()
line_end = bound.allNodes()[1].pos().array()
dist = np.sqrt(np.sum((line_end - line_start)**2))
number_segs = int((dist // max_length) + 1)
line = np.column_stack(
[np.linspace(line_start[0], line_end[0], number_segs),
np.linspace(line_start[1], line_end[1], number_segs),
np.linspace(line_start[2], line_end[2], number_segs)])
add_lines.append(line)
marker_pos.extend([line_start, line_end])
# add_lines.append([a.pos().array() for a in bound.allNodes()])
if source_setup.lower() == 'loops':
if source_loops is None:
raise Exception('Need source_loops when source_setup '
'is set to "loops".')
source_tx = []
for loop in source_loops:
source_tx.append(loop.getCustEMLoopTx(max_length))
print(source_tx)
log.info('Add insert_loop_tx from given loop classes.')
# poly.build_surface(insert_lines=add_lines, closed_path=False,
# # line_marker=[88] * len(add_lines),
# insert_loop_tx=source_tx)
poly.build_surface(closed_path=False,
# line_marker=[88] * len(add_lines),
insert_loop_tx=source_tx)
# now we want to append missing markers in the middle of the
# polygons- This is somewhat experimental and works for rectangle
# loops (doesn't matter how many) == good for etra or mixed etras
# all_pos = np.array(add_lines).reshape(-1, 3) # n, 2, 3 -> 2*n, 3
all_pos = np.array(marker_pos).reshape(-1, 3)
x_poss = np.sort(np.unique(all_pos[:, 0]))
y_poss = np.sort(np.unique(all_pos[:, 1]))
x_mids = (x_poss[:-1] + x_poss[1:]) / 2.
y_mids = (y_poss[:-1] + y_poss[1:]) / 2.
for xmid in x_mids:
for ymid in y_mids:
log.debug('Add additional marker at ({:.3f}, {:.3f})'
.format(xmid, ymid))
poly.add_area_marker(poly.surface, [xmid, ymid, 0.0],
face_size=inner_area_cell_size)
# marker for outer cell size
poly.add_area_marker(poly.surface,
[minx + 0.1, miny + 0.1, 0.0],
face_size=outer_area_cell_size)
source_setup = 'skip'
if source_setup.lower() == 'etra':
if number_of_loops is None:
raise Exception('Need "number_of_loops" when using '
'source_setup="etra".')
ay = np.max(self.pos[:, 1]) - np.min(self.pos[:, 1])
# new way
buildEtraSourceDiscretization(poly, ay,
x_left=np.min(self.pos[:, 0]),
number_of_loops=number_of_loops)
elif source_setup.lower() == 'edges':
# closed polygon ?
poly.build_surface(insert_loops=[self.pos], # sources
closed_path=not self.grounded)
elif source_setup.lower() == 'nodes':
poly.build_surface(insert_points=self.pos) # sources
elif source_setup.lower() == 'skip':
pass
elif source_setup.lower() == 'custem':
poly.build_surface(**kwargs) # sources as defined using custem
else:
raise Exception('Source setup "{}", not in ["etra", "nodes", '
'"edges", "loops"].'.format(source_setup))
dipole_box = [np.min(self.pos[:, 0]) - 5,
np.max(self.pos[:, 0]) + 5,
np.min(self.pos[:, 1]) - 5,
np.max(self.pos[:, 1]) + 5,
-4]
log.debug('add_surface_anomaly: {}'
.format(np.array(dipole_box)))
# add additional refinement box around dipoles only for the first 4
# meter (from top dipoles +5 meter in x and y to a depth of 4 meter).
poly.add_surface_anomaly(
insert_paths=[
np.array(((dipole_box[0], dipole_box[2], 0.0),
(dipole_box[0], dipole_box[3], 0.0),
(dipole_box[1], dipole_box[3], 0.0),
(dipole_box[1], dipole_box[2], 0.0)))],
depths=[dipole_box[4]],
cell_sizes=[inner_area_cell_size]) # 3d marker
# add marker just inside the corner of the new box
poly.add_area_marker(poly.surface, [dipole_box[0] + 0.1,
dipole_box[2] + 0.1, 0.0],
face_size=inner_area_cell_size) # 2d marker
# add additional refinement box if requested
# if refinement_box is not None:
# poly.add_surface_anomaly(
# insert_loops=[
# np.array(((refinement_box[0], refinement_box[2], 0.0),
# (refinement_box[0], refinement_box[3], 0.0),
# (refinement_box[1], refinement_box[3], 0.0),
# (refinement_box[1], refinement_box[2], 0.0)))],
# depths=[refinement_box[4]],
# cell_sizes=[box_cell_size])
#
# # add marker just inside the corner of the new box
# poly.add_area_marker(poly.surface, [refinement_box[0] + 0.1,
# refinement_box[2] + 0.1, 0.0])
log.debug('build_halfspace_mesh()')
poly.build_halfspace_mesh()
# there_is_always_a_bigger_world works with factors:
# I calculate the minimum factor needed to get at least a 10km boundary
xfactor = 10000. / np.minimum(np.abs(poly.Omega.xmin()),
np.abs(poly.Omega.xmax()))
yfactor = 10000. / np.minimum(np.abs(poly.Omega.ymin()),
np.abs(poly.Omega.ymax()))
# minz too small, would lead to insane bounds
zfactor = 10000. / np.absolute(poly.Omega.zmin())
poly.there_is_always_a_bigger_world(xfactor, yfactor, zfactor)
poly.write_mesh_parameters()
# SEMI TESTED preserve edges!
if poly_2d:
log.info('Add poly 2d to FEM mesh.')
node_ids = []
doublecheck = {}
for bound in poly_2d.boundaries():
node_ids.append(bound.ids())
for id0, id1 in node_ids:
n0 = poly_2d.node(id0)
n1 = poly_2d.node(id1)
if id0 not in doublecheck.keys():
n0x = n0.x()
n0y0 = miny + 0.5 # to stay inside refinement box
n0y1 = maxy - 0.5 # to stay inside refinement box
n0z = n0.y()
log.debug(
'create poly2d nodes: ({:.2f}, {:.2f}, {:.2f}), '
'({:.2f}, {:.2f}, {:.2f})'
.format(n0x, n0y0, n0z, n0x, n0y1, n0z))
doublecheck[id0] = [
poly.Omega.createNode([n0x, n0y0, n0z]),
poly.Omega.createNode([n0x, n0y1, n0z])]
if id1 not in doublecheck.keys():
n1x = n1.x()
n1y0 = miny + 0.5 # to stay inside refinement box
n1y1 = maxy - 0.5 # to stay inside refinement box
n1z = n1.y()
log.debug(
'create poly2d nodes: ({:.2f}, {:.2f}, {:.2f}), '
'({:.2f}, {:.2f}, {:.2f})'
.format(n1x, n1y0, n1z, n1x, n1y1, n1z))
doublecheck[id1] = [
poly.Omega.createNode([n1x, n1y0, n1z]),
poly.Omega.createNode([n1x, n1y1, n1z])]
poly.Omega.createQuadrangleFace(doublecheck[id0][0],
doublecheck[id0][1],
doublecheck[id1][1],
doublecheck[id1][0])
if self.preserved_edge_centers is not None:
log.info('Add preserved edges from paramesh to FEM mesh.')
edge_centers = np.array(self.para_mesh_2d.boundaryCenters())
preserved_edges = []
for center in self.preserved_edge_centers:
preserved_edges.append(
np.where(np.sum(np.isclose(edge_centers, center),
axis=1) == 3)[0])
node_ids = []
for edge_id in preserved_edges:
node_ids.append(
self.para_mesh_2d.boundaries()[edge_id[0]].ids())
doublecheck = {}
for id0, id1 in node_ids:
# build quadrangle faces in 3D with 2D edge extended as plane
# in 3D using y-vector endpoints
n0 = self.para_mesh_2d.node(id0)
n1 = self.para_mesh_2d.node(id1)
# print([n0.x(), miny, n0.y()], [n1.x(), miny, n1.y()])
if id0 not in doublecheck.keys():
doublecheck[id0] = [
poly.Omega.createNode([n0.x(), miny + 0.1, n0.y()]),
poly.Omega.createNode([n0.x(), maxy - 0.1, n0.y()])]
if id1 not in doublecheck.keys():
doublecheck[id1] = [
poly.Omega.createNode([n1.x(), miny + 0.1, n1.y()]),
poly.Omega.createNode([n1.x(), maxy - 0.1, n1.y()])]
poly.Omega.createQuadrangleFace(doublecheck[id0][0],
doublecheck[id0][1],
doublecheck[id1][1],
doublecheck[id1][0])
# exportTetgenEdge(preserve_edges,
# mesh_path.with_suffix('.edge').as_posix())
if exportVTK or log.getEffectiveLevel() <= 10:
# .Omega == pyGIMLi poly object
poly.Omega.exportBoundaryVTU(
mesh_path.with_name('2Dsurface.vtu').as_posix())
poly.Omega.exportVTK(
mesh_path.with_name('exportpoly.vtk').as_posix())
# end custEM here
export_tetgen_poly_file(
poly.Omega, mesh_path.with_suffix('.poly').as_posix())
# poly.call_tetgen(tet_param='-kfpq1.4aA', export_vtk=True)
# tetgen mesh
success = tetgen151(mesh_base, maxArea='', quality=self.tetgen_quality)
if success != 0:
log.error('tetgen seems to have run into trouble.')
log.error('used call: tetgen151("{}")'.format(mesh_base))
if poly_2d:
log.error(
'You tried to insert a poly_2d. You can check the input '
'file for tetgen under "{}" for poly intersections, '
'it may help to find the origin of the problem.'
.format(mesh_path.with_name('exportpoly.vtk').as_posix()))
poly.Omega.exportVTK(
mesh_path.with_name('exportpoly.vtk').as_posix())
# import mesh
log.info('import FEM mesh...')
loopmesh = pg.meshtools.readTetgen(mesh_base + '.1')
# set mesh
self.setFEMMesh(loopmesh, valid_marker=None,
savename=self.loop_mesh_name)
# clean up
cleanUpTetgenFiles(mesh_base)
# prepare saving
Path(self.loop_mesh_name).parent.mkdir(exist_ok=True, parents=True)
# export
if exportVTK:
self.loopmesh.exportVTK(mesh_path.with_suffix('.vtk').as_posix())
log.info('Export .bms ({})'.format(self.loop_mesh_name))
self.loopmesh.save(self.loop_mesh_name) # for kernel call (piping)
if exportH5:
h5path = mesh_path.with_suffix('.h5').as_posix()
log.info('Export .h5 ({})'.format(h5path))
self.exportFenicsHDF5Mesh(h5path)
# save secondary config for the purpose of slice generation and
# calculation via custem
self.secondary_config.save(mesh_path.with_suffix('.cfg').as_posix())
return self.loopmesh
[docs] def createSecondaryConfig(self, mod_name, mesh_name, m_dir='.', r_dir='.',
pf_name=None, p2=False,
approach='E_s', pf_EH_flag='E'):
""" Initializes an instance of a secondary config for use of custEM.
Parameters
----------
mod_name: string
Name of the mod instance (for saving and import in mpi environment)
mesh_name: string
Basename of mesh imported by the fenics functions (.h5). Mind
the subfolder '/_h5' that will be added to the string.
m_dir: string
Path to mesh directory of custEM.
r_dir: string
Path to result directory of custEM.
pf_name: string
File name under which the primary field will be saved in the
appropriate directory of custEM.
"""
pf_name = pf_name or mod_name # different folder, so no harm done
self.secondary_config = SecondaryConfig(
mod_name=mod_name, mesh_name=mesh_name, m_dir=m_dir,
r_dir=r_dir, pf_name=pf_name, p2=p2, approach=approach,
pf_EH_flag=pf_EH_flag,
sigma_ground=1./np.atleast_1d(self.config.rho))
self.loop_mesh_name = Path('{}/_h5/{}'.format(
self.secondary_config.m_dir,
self.secondary_config.mesh_name)).as_posix()
[docs] def createDefaultSecondaryConfig(self, base=None, prefix='', suffix='',
m_dir='.', r_dir='.'):
""" Short cut to generate a secondary config with some default params.
Parameters
----------
prefix: string
String to be added to the *getDefaultLoopMeshBaseName* string to
define the automatic generated names for the default secondary
config.
suffix: string
String to be added to the *getDefaultLoopMeshBaseName* string to
define the automatic generated names for the default secondary
config.
"""
if base is None:
if self.name is not None:
base = Path(self.name).stem
else:
if self.loop_mesh_name is None:
base = self.getDefaultLoopMeshBaseName()[9:]
else:
base = Path(self.loop_mesh_name).stem
base = '{}{}{}'.format(prefix, base, suffix)
mod_name = '{}'.format(base)
mesh_name = 'mesh_{}'.format(base)
self.createSecondaryConfig(mod_name, mesh_name, m_dir=m_dir,
r_dir=r_dir)
[docs] def getCustEMLoopTx(self, max_length):
if self.type.lower() in ['rectangle', 'square']:
from custEM.meshgen import meshgen_utils as mu
xmin = np.min(self.pos[:, 0])
xmax = np.max(self.pos[:, 0])
ymin = np.min(self.pos[:, 1])
ymax = np.max(self.pos[:, 1])
nx = int(((xmax - xmin) // max_length) + 1)
ny = int(((ymax - ymin) // max_length) + 1)
loop_tx = mu.loop_r([xmin, ymin], [xmax, ymax],
nx=max(nx, 1), ny=max(ny, 1))
# loop_tx = np.array([[xmin, ymin, 0],
# [xmax, ymin, 0],
# [xmax, ymax, 0],
# [xmin, ymax, 0]])
else:
raise Exception('No recipy for automatic creation of custem tx '
'shape for loop-type "{}"'.format(self.type))
return loop_tx
[docs] def setLoopMeshName(self, savename=None):
""" Sets loop mesh name or figures it out from sec config.
"""
invalid_path = False
if savename is None:
savename = self.getDefaultLoopMeshBaseName()
else:
if self.secondary_config is not None:
if '_h5' not in Path(savename).parts:
invalid_path = True
log.info('using basename of savename and use mesh '
'directory as defined in secondary config!')
savename = Path(savename).name
if invalid_path:
mesh_path = Path(
self.secondary_config.m_dir
).joinpath('_h5').joinpath(savename).with_suffix('.bms')
else:
mesh_path = Path(savename)
if self.secondary_config is not None:
self.secondary_config.mesh_name = mesh_path.stem
self.loop_mesh_name = mesh_path.as_posix()
[docs] def getDefaultLoopMeshBaseName(self):
""" Returns string with default base name of the loop mesh. """
return '_default_LoopMesh_{}_{}'.format(self.type, len(self.ds))
[docs] def getParaMesh2D(self): # , with_bounds=False):
# if with_bounds is False:
return self._para_mesh_2d
# else:
# return self._para_mesh_2d_with_bounds
[docs] def save(self, savename=None, config_savename=None, config2_savename=None,
save_mesh=True, save_field=True):
""" Saves the loop class in files.
Saves npz archive with loop itself.\n
Saves config.\n
Saves secondary config if initialized.\n
Saves mesh if save_mesh=True.\n
Saves field if save_field=True.
Parameters
----------
savename: string [ None ]
File basename for saving loop class and its components.
config_savename: string [ None ]
Explicit savename for config. Automatically generated if None.
config2_savename: string [ None ]
Explicit savename for secondary config. Automatically generated
if None.
save_mesh: boolean [ True ]
Saves mesh.
save_field: boolean [ True ]
Saves fields.
"""
# geometry
if savename is not None:
self.name = savename
if self.name.endswith('.npz'):
self.name = self.name[:-4]
if self.name is None:
raise Exception('Need savename to save loop instance.')
log.info(f'Save Loop: {self.name}')
Path(self.name).parent.mkdir(exist_ok=True, parents=True)
# loop mesh
if self.loopmesh is not None:
if isinstance(self.loopmesh, np.ndarray):
self.setLoopMesh(self.loopmesh)
if self.loop_mesh_name is None:
self.loop_mesh_name = self.name + '_mesh.bms'
if save_mesh is True:
log.info(f'Save mesh: {self.loop_mesh_name}')
self.loopmesh.save(self.loop_mesh_name)
# dipole mesh
if self.dipolemesh is not None:
if self.dipole_mesh_name is None:
self.dipole_mesh_name = self.name + '_dipole_mesh.bms'
if save_mesh is True:
log.info(f'Save Dipole mesh: {self.dipole_mesh_name}')
self.dipolemesh.save(self.dipole_mesh_name)
# config
if config_savename is None:
save_config = '.cfg'
self.config.save(self.name + save_config)
else:
save_config = config_savename
self.config.save(save_config)
if self.secondary_config is not None:
# secondary config
if config2_savename is None:
self.cfg2_savename = self.name + '_sec.cfg'
else:
self.cfg2_savename = config2_savename
# refresh frequency, in case of total field calculation
self.secondary_config.frequency = self.config.f
self.saveSecondaryConfig()
# test FE Mesh names are the same
name1 = Path(self.loop_mesh_name).with_suffix('').absolute()
name2 = Path('{}/_h5/{}'.format(
self.secondary_config.m_dir,
self.secondary_config.mesh_name)
).with_suffix('').absolute()
if name1 != name2:
# this might be intentional, so just a warning, moving on...
log.warning('Loop mesh name ({}) and mesh name from '
'secondary config ({}) are different!'
.format(name1.as_posix(), name2.as_posix()))
else:
self.cfg2_savename = None
# names + fields
if save_field is False:
savefield = None
else:
savefield = self.field
absmesh = None
if self.loop_mesh_name is not None:
absmesh = os.path.abspath(self.loop_mesh_name)
absdipole = None
if self.dipole_mesh_name is not None:
absdipole = os.path.abspath(self.dipole_mesh_name)
np.savez(self.name + '.npz',
save_config=save_config,
save_config2=self.cfg2_savename,
save_field=savefield,
save_mesh=absmesh,
save_dipole_mesh=absdipole,
save_d_field=self.d_field,
save_idx=self.idx,
save_area=self.area,
save_turns=self.turns,
ltype=self.type,
grounded=self.grounded,
pos=self.pos,
phi=self.phi,
ds=self.ds)
if self.secondary is not None:
np.save('{}_secondary.npy'.format(self.name), self.secondary)
def _dump2Temp(self):
import tempfile as temp
# create temporary filename
loopFile = temp.NamedTemporaryFile(suffix='.npz', delete=False)
savename = loopFile.name
loopFile.close()
cfgFile = temp.NamedTemporaryFile(suffix='.cfg', delete=False)
configname = cfgFile.name
cfgFile.close()
self.save(savename=savename, config_savename=configname,
save_mesh=False, save_field=False)
return (savename, configname)
[docs] def saveLoopMesh(self, savename=None):
""" Saves loopmesh using the given savename or an autogenerated name.
Updates *self.loop_mesh_name* in case of changes.
Parameters
----------
savename:
Export path name. Used over default name if given.
"""
if self.loopmesh is None:
raise Exception('No loopmesh to save. Abort.')
if savename is None:
if self.loop_mesh_name is None:
self.loop_mesh_name =\
self.getDefaultLoopMeshBaseName() + '.bms'
else:
self.loop_mesh_name = savename
Path(self.loop_mesh_name).parent.mkdir(exist_ok=True, parents=True)
print('saving bms: "{}"'.format(self.loop_mesh_name))
self.loopmesh.save(self.loop_mesh_name)
[docs] def saveFieldMatrix(self, name, verbose=True):
"""
Saves the three matrices needed for recalculation of the primary
field.
A compressed numpy archive is loaded and the matrices
are build afterwards, therefore import time is ~20% higher compared to
the pure pygimli way ( *.field_matrix.save('...')* ).
However, because the single arrays (indices and values) are saved
in one compressed file archive they need only one third space on the
hard disk compared to saving three separate matrices using pygimli
syntax.
Parameters
----------
name: string
Path for file to be saved.
verbose: boolean [ True ]
Turn on verbose mode.
"""
# from comet.pyhed.misc import loadSparseMatrixFromNumpyArchive
if verbose:
print('Saving fieldMatrices as "{}"'.format(os.path.abspath(name)))
Path(name).parent.mkdir(exist_ok=True, parents=True)
# exportSparseMatrixAsNumpyArchive(name,
# self.field_matrix,
# self.field_matrix_sin,
# self.field_matrix_cos)
self.field_matrix.save('{}_field_matrix.bmat'.format(name))
self.field_matrix_sin.save('{}_field_matrix_sin.bmat'.format(name))
self.field_matrix_cos.save('{}_field_matrix_cos.bmat'.format(name))
[docs] def saveSecondaryConfig(self, savename=None):
""" Saves secondary config in ASCII file.
Parameters
----------
savename: string [ None ]
Used savename over *loop.sec_savename*. Throws Exception if both
values are None. Replaces *loop.sec_savename*.
"""
if savename is not None:
self.cfg2_savename = savename
if self.cfg2_savename is None:
raise Exception('Need savename to save secondary configuration.')
self.secondary_config.save(self.cfg2_savename)
[docs] def exportVTK(self, save_vtk, secondary=False, **kwargs):
""" Exports the field in a vtk file.
Uses the *loopmesh* to save *field* with default configurations in a
vtk file.
Parameters
----------
save_vtk: string
Filename of the resulting vtk file.
kwargs: dict
Keyword arguments are redirected to the function
*pyhed.IO.savefieldvtk*.
"""
if secondary and self.secondary is None:
raise Exception('No secondary field found.')
if save_vtk is None:
save_vtk = self.loop_mesh_name[:-4] + '.vtk'
if not secondary:
if self.field is None:
if self.loopmesh is None:
raise Exception('No field or mesh found.')
self.loopmesh.exportVTK(save_vtk)
else:
from comet.pyhed.IO import savefieldvtk
savefieldvtk(save_vtk, self.loopmesh, self.field,
itype='Mesh', **kwargs)
else:
from comet.pyhed.IO import savefieldvtk
savefieldvtk(save_vtk, self.loopmesh, self.secondary,
itype='Mesh', **kwargs)
[docs] def exportFenicsHDF5Mesh(self, save_h5, dipole_mesh=False, **kwargs):
"""
Exports the mesh in a h5 file. Can save the loopmesh or the
dipole mesh seperately.
Need pygimli to work.
Parameters
----------
save_h5: string
Filename of the resulting h5 mesh (hdf5 data container in fenics
syntax).
dipole_mesh: boolean [ False ]
Save dipole mesh instead of loop mesh
(Call this function twice if you want to save both meshes).
kwargs: dict
Keyword arguments are redirected to
*pygimli.meshtools.exportFenicsHDF5Mesh*
"""
if dipole_mesh is False:
pg.meshtools.exportFenicsHDF5Mesh(self.loopmesh, save_h5,
**kwargs)
else:
pg.meshtools.exportFenicsHDF5Mesh(self.dipolemesh, save_h5,
**kwargs)
[docs] def initCustEM(self, secondary_config=None, init_primary_field_class=True,
procs_per_proc=2):
""" Initalizes instance of custEM mod class for FEM calculation.
Parameters
----------
secondary_config: string or pyhed.SecondaryConfig [ None ]
Initialized secondary config class to be used for the mod instance
or path to corresponding file containing the secondary config.
Uses secondary_config over *loop.secondary_config*. Throws
Exception if both values are None.
init_primary_field_class: boolean [ True ]
Additionally initializing the primary field class of the mod class
instance (used for primary field export).
"""
from custEM.core import MOD
from custEM.fem.primary_fields import PrimaryField
if secondary_config is not None:
self.setSecondaryConfig(secondary_config)
if self.secondary_config is None:
raise Exception('No config for secondary field calculation has '
'been loaded.')
self.secMOD = MOD(self.secondary_config.mod_name, # mod_name
self.secondary_config.mesh_name, # mesh_name
self.secondary_config.approach, # custem approach
# kwargs are following
p=self.secondary_config.polynominal_order,
overwrite_results=
self.secondary_config.enable_overwrite,
m_dir=self.secondary_config.m_dir,
# directory where to find the meshs + '/_h5'
r_dir=self.secondary_config.r_dir,
mumps_debug=False,
debug_level=20,
profiler=False)
sigma_ground = np.copy(self.secondary_config.sigma_ground).tolist() +\
self.secondary_config.sigma_anom
# sigma_0=np.copy(self.secondary_config.sigma_ground).tolist()
sigma_0 = np.copy(self.secondary_config.sigma_ground).tolist() +\
np.array(
self.secondary_config.sigma_ground)\
[np.array(self.secondary_config.layer_markers) - 1].tolist()
self.secMOD.MP.update_model_parameters(
f=self.config.f,
sigma_ground=sigma_ground,
sigma_0=sigma_0
)
if init_primary_field_class:
log.info('initialize primary field class')
self.secMOD.FS.PF = PrimaryField(
self.secMOD.FS,
self.secMOD.FE,
self.secMOD.MP,
pf_type='custom',
pf_name=self.secondary_config.pf_name,
s_type='loop_r')
[docs] def updateFEMAnomaly(self, anomaly=None, set_marker=True,
set_attributes=False, vtk_name=None,
ground_marker=None, export_H5=False,
sort=True):
""" Transfers resistivity anomalies from 2D para mesh in FEM mesh.
Parameters
----------
anomaly_vector: array_like [ None ]
Array containing the resistivity anomalies of the 2D parameter
mesh. If None, the secondary config is asked for a anomaly vector.
(For setting the marker for exmaple).
set_marker: boolean [ True ]
Transfers the marker from the parameter mesh to the FEM mesh.
This only has to be done once and can then switched off for
performance.
set_attribute: boolean [ False ]
Sets the attribute in the FEM mesh for debugging purposes. The
anomaly vector for calculation is stored in secondary_config.
vtk_name: string [ None ]
Optional vtk export with name = *vtk_name* if *vtk_name* is not
None.
ground_marker: array_like [None]
Corresponding marker for each entry in the anomaly vector. Each
marker corresponds to a layer number of the 1d primary field
beginning at 1 for the first layer, counting upward (0 belongs
to the air layer). None results in np.ones_like(anomaly_vector,
dtype=int).
"""
# input check, anomaly handling in paramesh
self._checkParaMesh()
assert self.loopmesh is not None, 'No FE mesh found.'
# anomaly
if anomaly is not None:
# case 1/4: anomaly given: set anom (paramesh)
self.setAnomalies(anomaly=anomaly, sort=sort)
# sets self.secondary_config.sigma_paramesh
set_sigma_anom = True
# check FE mesh marker
# IMPORTANT: setAnomalies needs to be called before _setFEMMarker
# otherwise the marker in the paramesh are wrong and this function
# will fail. this is one reason why _setFEMMarker is a underscore
# function as this is not automatically detectable and caused more
# than one headache.
# handling marker and attributes of FEM Mesh
if set_marker or self.valid_marker is None:
self._setFEMMarker()
log.debug('updateFEMAnomaly(): ({}/{}) used paramesh marker in FE mesh'
.format(np.sum(self.valid_marker), len(self.valid_marker)))
if self.secondary_config.sigma_paramesh != []:
# case 2/4: not given, look in secondary config
anomaly = np.array(self.secondary_config.sigma_paramesh)
log.debug('updateFEMAnomaly: no anomalies given, found '
'sigma_paramesh in secondary_config: {} ({})'
.format(anomaly, len(anomaly)))
sigma_anom = np.array(anomaly)[self.valid_marker]
log.info('updateFEMAnomaly: using valid_marker: sigma_anom: '
'{} ({})'
.format(sigma_anom, len(sigma_anom)))
set_sigma_anom = True
else:
# cases 3 and 4 still not given and not found in secondary config
# searching on for imported sigma_anom (e.g. from another run)
# In this case the sigma:anom is not evaluated again.
if self.secondary_config.sigma_anom != []:
# case 3/4: sigma anom found in secondary config
# e.g. imported from earlier
# use only marker found in FE mesh (valid_marker)
sigma_anom = self.secondary_config.sigma_anom
log.info('updateFEMAnomaly: Found sigma_anom in '
'secondary config: {} ({})'
.format(sigma_anom, len(sigma_anom)))
set_sigma_anom = False
log.info('updateFEMAnomaly: sigma_anom will not be changed.')
else:
# case 4/4: no anomly, no 2D fields, problem -> abort
raise Exception('updateFEMAnomaly: no anomaly vector '
'found or given, abort.')
# last check to prevent old bug, leave it in
assert len(sigma_anom) != 0
# ground resistivities
ground = np.atleast_1d(self.secondary_config.sigma_ground)
if ground_marker is not None:
ground_marker = np.array(ground_marker)[self.valid_marker]
if set_attributes or log.getEffectiveLevel() <= 10:
vals = np.array([1e-8], dtype=np.float)
vals = np.concatenate([vals, ground])
vals = np.concatenate([vals, sigma_anom])
attr = vals[self.loopmesh.cellMarkers()]
log.debug('loop_bib.setFEMAnomaly(..., set_attributes=True): '
'export res instead of sigma!')
self.loopmesh.setCellAttributes(1./attr)
if log.getEffectiveLevel() <= 10:
# active debug mode
if vtk_name is None:
# take femmesh name and put vtk there
vtk_name = self.secondary_config.m_dir + '/_h5/'
vtk_name += self.secondary_config.mesh_name
vtk_name += '_debug_FEMesh.vtk'
if vtk_name is not None:
Path(vtk_name).parent.mkdir(exist_ok=True, parents=True)
self.loopmesh.exportVTK(vtk_name, pg.Vector(1./attr))
# sets: self.secondary_config.sigma_anom = new_anoms
# sets: self.secondary_config.layer_markers = ground_marker
if set_sigma_anom:
# cases 1 or 2: set anom (FE mesh)
# in case 3: has been imported and will not be updated
self.secondary_config.setAnomalies(sigma_anom,
layer_markers=ground_marker)
[docs] def updateFEMAnomaly_old(self, anomaly=None, set_marker=True,
set_attributes=False, vtk_name=None,
ground_marker=None, export_H5=False):
""" Transfers resistivity anomalies from 2D para mesh in FEM mesh.
Parameters
----------
anomaly_vector: array_like [ None ]
Array containing the resistivity anomalies of the 2D parameter
mesh. If None, the secondary config is asked for a anomaly vector.
(For setting the marker for exmaple).
set_marker: boolean [ True ]
Transfers the marker from the parameter mesh to the FEM mesh.
This only has to be done once and can then switched off for
performance.
set_attribute: boolean [ False ]
Sets the attribute in the FEM mesh for debugging purposes. The
anomaly vector for calculation is stored in secondary_config.
vtk_name: string [ None ]
Optional vtk export with name = *vtk_name* if *vtk_name* is not
None.
ground_marker: array_like [None]
Corresponding marker for each entry in the anomaly vector. Each
marker corresponds to a layer number of the 1d primary field
beginning at 1 for the first layer, counting upward (0 belongs
to the air layer). None results in np.ones_like(anomaly_vector,
dtype=int).
"""
# input check, anomaly handling in paramesh
self._checkParaMesh()
assert self.loopmesh is not None, 'No FE mesh found.'
# check FE mesh marker
# handling marker and attributes of FEM Mesh
if set_marker or self.valid_marker is None:
self.setFEMMarker()
log.debug('updateFEMAnomaly(): ({}/{}) used paramesh marker in FE mesh'
.format(np.sum(self.valid_marker), len(self.valid_marker)))
# anomaly
if anomaly is None:
# case 1/2: not given, look in secondary config
anomaly = np.array(self.secondary_config.sigma_anom)
log.debug('updateFEMAnomaly found sigma anom: {} ({})'
.format(anomaly, len(anomaly)))
# already checked for valid marker, use directly
new_anoms = anomaly
else:
# case 1/2: anomaly given, check for consistency
anomaly = self._checkAnomaly(anomaly)
# use only marker found in FE mesh (valid_marker)
new_anoms = np.array(anomaly)[self.valid_marker]
log.debug('updateFEMAnomaly(): sigma_anom = {}'.format(len(new_anoms)))
# last check to prevent old bug, leave it in
assert len(anomaly) != 0
# ground resistivities
ground = np.atleast_1d(self.secondary_config.sigma_ground)
if ground_marker is not None:
ground_marker = np.array(ground_marker)[self.valid_marker]
if set_attributes:
vals = np.array([1e-8], dtype=np.float)
vals = np.concatenate([vals, ground])
vals = np.concatenate([vals, new_anoms])
attr = vals[self.loopmesh.cellMarkers()]
log.debug('loop_bib.setFEMAnomaly(..., set_attributes=True): '
'export res instead of sigma!')
self.loopmesh.setCellAttributes(1./attr)
if vtk_name is not None:
Path(vtk_name).parent.mkdir(exist_ok=True, parents=True)
self.loopmesh.exportVTK(vtk_name, pg.Vector(1./attr))
log.info('updateFEMAnomaly(): {} anomalies.'.format(len(new_anoms)))
# sets: self.secondary_config.sigma_anom = new_anoms
# sets: self.secondary_config.layer_markers = ground_marker
self.secondary_config.setAnomalies(new_anoms,
layer_markers=ground_marker)
log.debug('sec config sigma ground: {} ({})'
.format(self.secondary_config.sigma_ground,
len(self.secondary_config.sigma_ground)))
log.debug('sec config sigma anom: {} ({})'
.format(np.array(self.secondary_config.sigma_anom),
len(self.secondary_config.sigma_anom)))
[docs] def prepareSecondaryFieldCalculation(self,
savename=None,
secondary_config=None,
fem_mesh=None,
para_mesh_2d=None,
set_marker=False,
anomaly_vector=None,
valid_marker=None,
verbose=False,
num_cpu=32,
force_primary=False,
export_vtk=False,
mod_name=None,
**kwargs):
"""
Based on the given secondary config a MOD instance using the third
party module custEM will be initialized. This includes the optional
generation of a FEM suited mesh containing resistivity information
from a 2D parameter mesh.
Parameters
----------
savename: string [ None ]
Name under which loopclass and secondary config (+= '_sec.cfg')
are to be saved. Needed for secondary approach.
secondary_config: pyhed.SecondaryConfig or string [ None ]
Filename of configuration file or initialized class instance of a
secondary configuration. Optional if already given manually.
fem_mesh: pg.Mesh or string [None]
FEM suited mesh or filename, respectively. Optional. If not given
a suited mesh will be generated if a valid para_mesh_2d is
provided.
para_mesh_2d: pg.Mesh or string [ None ]
2D parameter mesh providing cell indices for the appending of
resitivity information. Needed for automatic FEM mesh generation.
Can be set manually beforehand.
set_marker: boolean [ True ]
Flag to decide if the fem mesh has got the needed marker for
the resitivity distribution. Can be omitted if already done and
saved (e.g. if same mesh is used again).
anomaly_vector: np.ndarray [ None ]
Conductivity values [S/m] of the parameter mesh to be used in the
seocondary field approach. Uses given value over array found in
secondary config. Raises Exception if neither found nor given.
ground_marker: np.ndarray [ None ]
Corresponding marker for each entry in the anomaly vector. Each
marker corresponds to a layer number of the 1d primary field
beginning at 1 for the first layer, counting upward (0 belongs
to the air layer). None results in np.ones_like(anomaly_vector,
dtype=int).
verbose: boolean [ False ]
Turn on verbose mode.
num_cpu: integer [ 32 ]
Maximum number of processes allowed for this task.
force_primary: boolean [ False ]
Force a recalculation of the primary field.
mod_name: string or None [ None ]
Overrides mod name. Useful if looping over many loops, as default
name could be similar.
magnetic: boolean [ True ]
Prepares magnetic primary fields. If False only dummies are created
to avoid error messages from custEM during import. Set to False if
secondary electric approach is used for secondary field
calculation.
electric: boolean [ True ]
Prepares electric primary fields. If False only dummies are created
to avoid error messages from custEM during import. Set to False if
secondary magnetic approach is used for secondary field
calculation.
Returns:
--------
tuple: (savename, sec_savename)
Absolute file paths for the secondary approach.
Usage:
------
In order to prepare a secondary field calculation you need:
- a secondary config (default is provided)
- a conductivity vector (*)
- a 2d parameter mesh matching the anomalies (*)
- a marker_vector (*)
*if not in secondary config or proviedd beforehand
and optionally either
- fem_mesh (without marker -> set_marker=True (default))
or
- fem_mesh (with marker -> set_marker=False)
or
- no fem_mesh (auto creation)
"""
new_save = False # additional saving required
# setting optional secondary configuration
if secondary_config is not None:
self.setSecondaryConfig(secondary_config)
pf_EH_flag = kwargs.pop('pf_EH_flag', None)
if pf_EH_flag is not None:
self.secondary_config.pf_EH_flag = pf_EH_flag
approach = kwargs.pop('approach', None)
if approach is not None:
self.secondary_config.approach = approach
# simple check to save time
if '_t' in self.secondary_config.approach:
raise Exception('Total field approaches are not yet supported.')
# setting optional parameter mesh
if para_mesh_2d is not None:
self.setParaMesh2D(para_mesh_2d, anomaly=anomaly_vector,
sort=kwargs.pop('sort', True))
new_save = True
self._checkParaMesh()
# setting optional FEM mesh
if fem_mesh is not None:
self.setFEMMesh(fem_mesh,
savename=self.secondary_config.mesh_name,
valid_marker=valid_marker)
set_marker = False
if valid_marker is None and self.valid_marker is None:
new_save = True
# if no mesh available or provided: create
if self.loopmesh is None:
self.createFEMMesh()
set_marker = False
new_save = True
assert self.loopmesh is not None
log.info('setting marker: {}'.format(set_marker))
if set_marker is True:
new_save = True
log.info('need to save mesh again: {}'.format(new_save))
self.updateFEMAnomaly(anomaly=anomaly_vector,
set_marker=set_marker)
if self.secondary_config.sigma_anom is None:
raise Exception('No anomaly vector found. Abort.')
if not os.path.exists('{}/_h5/{}.h5'.format(
self.secondary_config.m_dir,
self.secondary_config.mesh_name)):
new_save = True
if new_save:
# ...then the copy on harddrive needs updating...
np.save('{}/_h5/fem_marker.npy'
.format(self.secondary_config.m_dir), self.valid_marker)
# export pygimli version of FEM mesh with markers (.bms)
self.saveLoopMesh('{}/_h5/{}.bms'.format(
self.secondary_config.m_dir,
self.secondary_config.mesh_name))
# export vtk version of FEM mesh with markers (.vtk)
vtk_name = '{}/_h5/{}.vtk'.format(
self.secondary_config.m_dir,
self.secondary_config.mesh_name)
log.info('saving vtk: "{}"'.format(vtk_name))
self.loopmesh.exportVTK(vtk_name)
# export fenics version of FEM mesh with markers (.h5)
h5_loop_name = self.loop_mesh_name[:-4] + '.h5'
log.info('saving h5: "{}"'.format(os.path.abspath(h5_loop_name)))
log.info('unique cell marker: {}'
.format(len(np.unique(self.loopmesh.cellMarkers()))))
pg.meshtools.exportFenicsHDF5Mesh(self.loopmesh, h5_loop_name)
# recalculation of primary fields required?
found = self._checkPrimaryFields()
calc = not found or force_primary
log.info('primary field: found: {}, force: {}, calc: {}'
.format(found, force_primary, calc))
if calc:
self.calcAndExportFieldsForFenics(
num_cpu=num_cpu, export_vtk=export_vtk,
debug=kwargs.pop('debug', False), **kwargs)
# updating name
if mod_name is not None:
self.secondary_config.mod_name = mod_name
# save for secondary field calculation call in mpi environment
if savename is None:
savename = Path(self.secondary_config.r_dir).joinpath(
self.secondary_config.mod_name + '_primary_loop').as_posix()
self.save(os.path.abspath(savename),
save_mesh=False) # is already saved
[docs] def show(self, **kwargs):
"""
Plots the Loopdiscretisation and the dipole directions and Length.
For inspection of the loop-class and debugging purpose. Or for your
curiosity.
Parameters
----------
kwargs: dict
Keyword arguments are redirected to *pyhed.plot.plot_bib.showLoop*.
"""
from comet.pyhed.plot.plot_bib import showLoop
return showLoop(self.pos, self.phi, self.ds, **kwargs)
[docs] def effectiveArea(self):
""" Returns *self.area* * *self.turns* (0 for not closed loops). """
return self.area * self.turns
def _checkParaMesh(self, raise_it=True):
if self.para_mesh_2d is None:
if raise_it:
raise Exception('Need parameter mesh in order to set anomaly '
'vector.')
else:
return False
else:
return True
def _checkPrimaryFields(self):
""" Check if primary fields already exists and returns True/False. """
if self.secondary_config is None:
return False
# for total field approach, no primary fields are needed
if '_t' in self.secondary_config.approach:
return True
pf_path = '{}/primary_fields/{}/{}'.format(
self.secondary_config.r_dir,
self.secondary_config.pf_type,
self.secondary_config.pf_name)
# extensive check for previously calculated fields
for field in ['E', 'H']:
for complexity in ['real', 'imag']:
if not os.path.exists('{}_{}_0_{}_cg.h5'.format(
pf_path, field, complexity)):
return False
return True
def _handleNonUniqueDipoles(self):
"""
Internally called function.
Sort out double dipole positions and handle their phi and ds.
"""
# new dtype looks like three floats in a row
# so unique searches for a unique combination of floats, instead of
# single unique floats
posview = self.pos.ravel().view(
np.dtype((np.void,
self.pos.dtype.itemsize*self.pos.shape[1])))
_, unique_idx, inverse_idx, counts_idx = np.unique(posview,
return_index=True,
return_inverse=True,
return_counts=True)
# found unique: now sort, filter and merge double pos
sorted_idx = unique_idx.argsort()
unique_sorted_idx = unique_idx[sorted_idx]
new_pos = np.array(self.pos)[unique_sorted_idx]
new_ds = np.array(self.ds)[unique_sorted_idx]
new_phi = np.array(self.phi)[unique_sorted_idx]
for i, count in enumerate(counts_idx[sorted_idx]):
if count > 1:
idx_i = np.where(inverse_idx == inverse_idx[i])
new_ds[i] = np.sum(self.ds[idx_i])
new_phi[i] = np.einsum('i,i->',
self.ds[idx_i],
self.phi[idx_i])/new_ds[i]
orig_pos = len(self.pos)
self.pos = new_pos
self.phi = new_phi
self.ds = new_ds
log.debug('merge dipoles: {} -> {} dipoles'.format(
orig_pos, len(self.pos)))
def _setDistance(self):
""" Biggest and smallest distance between two dipoles of the loop. """
if self.type not in ('dipole', 'Dummy') and len(self.pos) > 1:
dislist = []
for i in range(len(self.pos) - 1):
# (n * (n - 1)) / 2 steps to calculate
dislist.extend(-self.pos[i + 1:] + self.pos[i])
dislist = np.array(dislist).T
sqaredis = np.sqrt(np.sum(dislist**2, axis=0))
self._maxdistance = np.max(sqaredis)
self._mindistance = np.min(self.ds)
else:
self._maxdistance = 5
self._mindistance = 0.2
def _loadLoopGeometry(self, filename):
""" Loads the basic geometry needed to initalise a loop class.
Called by *loop.load* to load geometry from binary npz file archive.
Parameters
----------
filename: string
Path to file destination.
"""
DeprecationWarning(
'This is an old loop class. Please save again and you can get rid '
'of the *_geometry* file. Its now included in the main *.npz*.')
archive = np.load(filename)
self.type = str(archive['ltype'])
self.grounded = bool(archive['grounded'])
self.pos = archive['pos']
self.phi = archive['phi']
self.ds = archive['ds']
self._setDistance()
def _checkAnomaly(self, anomaly):
""" Internal function to check anomaly vector.
Accounts for triangle Boundaries around paramesh if set.
"""
# shortcut to avoid if-else in other functions
if anomaly is None:
return anomaly
log.debug('_checkAnomaly input anomaly: {} ({})'
.format(anomaly, len(anomaly)))
# import check
self._checkParaMesh()
if len(anomaly) != self.orig_cell_count_2d:
raise Exception('Dimension mismatch between original paramesh '
'({} cells) and anomaly vector ({} values).'
.format(self.orig_cell_count_2d, len(anomaly)))
# prepare anomaly vector with respect to parameter mesh prolongation
if self.has_boundary is False:
anom = anomaly
else:
anom = np.zeros(self.para_mesh_2d.cellCount())
anom[self.orig_cells] = anomaly
anom = pg.Vector(anom)
# inplace prolongation needs reference
# therefore explicit pg.Vector call is needed before giving anom
# to prolongateEmptyCellsValues
self.para_mesh_2d.prolongateEmptyCellsValues(anom)
log.debug('_checkAnomaly: detect boundary around paramesh: '
'prolongate empty cells values.')
with temporal_printoptions():
log.debug('_checkAnomaly prolonged anomaly: {} ({})'
.format(np.array(anom), len(anom)))
self.para_mesh_2d.setCellAttributes(anom)
# pygimli to numpy
anom = np.array(anom)
if self.paramesh_sort:
# sorted cells: anomaly vector has unique values
# self.anom_idx -> from 2d mesh with bounds to unique vals
anom = anom[self.anom_idx]
log.debug('_checkAnomaly: {} ({})'
.format(anom, len(anom)))
# return one value per unique marker in FEM Mesh
return anom
[docs]class Geometry:
def __init__(self):
self.poly = None
def _checkClockwise(points):
"""
Polygon Area formula. Works for most closed polygons.
(Not for figure-of-eight)
Parameters
----------
points: np.ndarray
List or array of coordinates of a closed polygon to determine if the
order is in clockwise or counterclockwise order.
"""
points = np.asarray(points)
x_1 = np.append(points[1:, 0], points[0, 0])
y_1 = np.append(points[1:, 1], points[0, 1])
area = np.sum((x_1 - points[:, 0]) * (points[:, 1] + y_1))
if np.isclose(area, 0):
print('Attribute "clockwise" could not be determined in this '
'overlapping loop. Please check the directions of the dipoles.')
return bool(np.sign(area) == 1)
def _3dlinspace(Point1, Point2, num):
""" Internal function. Like linspace but for threedimensional points. """
return np.array((np.linspace(Point1[0], Point2[0], num),
np.linspace(Point1[1], Point2[1], num),
np.linspace(Point1[2], Point2[2], num)),
np.float64).T
[docs]def computeLoopPositions(Coordinates, ltype='arbitrary',
middle=None, # for circle only
grounded=True):
"""
This function calculates the position of the dipoles in order to represent
an arbitrary shaped loop with the given coordinates, the angle between its
orientation and the x-direction and the dipole Length it represents.
Parameters
----------
Coordinates: np.ndarray
Input list/array of points of shape: (n, 3) for n dipoles.
ltype: string
Defines the general type of the loop and therefore some internal
attributes. Choices are:\n
- 'rectangle' (also for square loops)
- 'circle'
- 'arbitrary' (for all other loops)
middle: np.ndarray [ None ]
Midpoint of the circular loop to calculate radius correct (and
therefore the correct source coordinates).
grounded: boolean [ True ]
For non grounded wires there are dipole placed bewtween the last
coordinate point and the first. This is ommitted for grounded wires.
"""
if isinstance(Coordinates, (list, tuple)):
Coordinates = np.array(Coordinates, np.float64)
realltype = ltype
if ltype not in ('circle', 'arbitrary', 'line'):
# basically the same as any arbitrary loop
ltype = 'arbitrary'
if ltype == 'circle':
coords_x = Coordinates.T[0] - middle[0]
coords_y = Coordinates.T[1] - middle[1]
Angles = np.arctan2(coords_y, coords_x)
phis = np.zeros(len(Angles))
Positions = np.zeros((len(Angles), 3), dtype=np.float64)
radius = np.sqrt(coords_x[0]**2 + coords_y[0]**2)
ds = np.zeros(len(Angles), dtype=np.float64)
n = len(Angles)
for i in range(n):
phi_ii = Angles[(i + 1) % n]
phi_i = Angles[i]
if phi_i > phi_ii:
phi_i += 2 * np.pi
phis[i] = ((phi_i + phi_ii)/2.) % (np.pi*2)
Positions[i] = np.array((radius * np.cos(phis[i]),
radius * np.sin(phis[i]),
middle[2]))
ds[i] = radius * ((phi_ii - phi_i) % (2 * np.pi))
phis = (phis + np.pi/2) % (2 * np.pi)
Positions[:, 0] += middle[0]
Positions[:, 1] += middle[1]
return Positions, phis, ds, grounded, ltype
if ltype in ['arbitrary', 'line']:
Positions = []
Directions = []
DipoleLengths = []
n = len(Coordinates)
for i in range(n - grounded):
cii = Coordinates[(i + 1) % n]
ci = Coordinates[i]
Positions.append((cii + ci) / 2)
Directions.append(np.arctan2((cii[1] - ci[1]), (cii[0] - ci[0])))
DipoleLengths.append(np.sqrt((cii[0] - ci[0])**2 +
(cii[1] - ci[1])**2))
Positions = np.array(Positions, dtype=np.float64)
Directions = np.array(Directions, dtype=np.float64)
DipoleLengths = np.array(DipoleLengths, dtype=np.float64)
return Positions, Directions, DipoleLengths, grounded, realltype
[docs]def buildSquare(k=1, num_segs=12, P=(0, 0, 0),
max_length=None, savename=None, dipole_clockwise=True,
turns=1, **kwargs):
""" Square loop around P with edge length k.
Parameters
----------
k: float [ 1 ]
Length of one edge.
num_segs: integer [ 12 ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
savename: string [ None ]
Basename for the loop. Trigger to save the loop.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
turns: integer [ 1 ]
Number of turns of a closed loop.
kwargs: dict
Keyword arguments are redirected to the loop class
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> from comet import pyhed as ph
>>> l = ph.loop.buildSquare(k=3.25, max_length=0.24)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
loop = buildRectangle(((-k/2. + P[0], -k/2. + P[1], 0),
(k/2. + P[0], k/2. + P[1], 0)), num_segs=num_segs,
max_length=max_length,
savename=savename,
dipole_clockwise=dipole_clockwise,
**kwargs)
loop.turns = turns
loop.area = k**2
return loop
[docs]def buildRectangle(points, num_segs=1, max_length=None, savename=None,
dipole_clockwise=None, turns=1, **kwargs):
""" Creates a rectangular shaped loop and manages dipole discretization.
Creates a rectangular loop out of four given corner points, with a given
discretisation between the points. Per default the function returns the
position of the dipoles, the angle between its orientation and the
x-direction and the dipole Length it represents.
Parameters
----------
points: list
List or Array containing 2D or 3D coordinates (however z-values
ignored for now). In case of a rectangle two or four coords are needed.
In case of two coordinates, the rectangle will have edges parallel to
the coordinate axes.
num_segs: integer [ 1 ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
savename: string [ None ]
Basename for the loop. Trigger to save the loop.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
turns: integer [ 1 ]
Number of turns of a closed loop.
kwargs: dict
Keyword arguments are redirected to the loop class.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> from comet import pyhed as ph
>>> l = ph.loop.buildRectangle([[-5, -5], [5, 5]], max_length=0.64)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
if len(points) == 2:
# assume corner points
p1 = np.copy(points[0])
p2 = np.copy(points[1])
assert not np.isclose(p1[0] - p2[0], 0.0) and \
not np.isclose(p1[1] - p2[1], 0.0), '"buildRectangle" needs two \
opposing corner points to work. Abort'
points = np.ones((4, 3))
points[0] = np.array((p1[0], p1[1], 0.0))
points[1] = np.array((p2[0], p1[1], 0.0))
points[2] = np.array((p2[0], p2[1], 0.0))
points[3] = np.array((p1[0], p2[1], 0.0))
loop = buildLoop(points, num_segs=num_segs, max_length=max_length,
savename=savename,
grounded=False, ltype='rectangle',
dipole_clockwise=dipole_clockwise,
**kwargs)
loop.turns = turns
loop.area = vec.areaFromPolyPoints(points)
return loop
[docs]def buildLoop(Points, num_segs=1, max_length=None, savename=None,
grounded=False, ltype=None, dipole_clockwise=None, turns=1,
**kwargs):
""" Creates an arbitrary shaped loop out of given coordinates.
The returnes object is an initialized loop class.
Most general function to build a loop and called by most of the other
specialized functions after input preparation.
Parameters
----------
Points: list
List or Array containing 2D or 3D coordinates of shape (n, 2 or 3) for
n corner point of an arbitrary shaped polygon (z_vlaues will be set to
0 or now).
num_segs: integer [ 1 ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
savename: string [ None ]
Basename for the loop. Trigger to save the loop.
grounded: boolean [ False ]
Defines wether the loop is closed or not. Also defines which default
field mode will be calculated, as 'tm' field of a closed loop is zero.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
turns: integer [ 1 ]
Number of turns of a closed loop.
kwargs: dict
Keyword arguments are redirected to the loop class.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> points = [[-5, -5], [-10, 5], [2.3, 3.14], [7, -7]]
>>> l = ph.loop.buildLoop(points, max_length=0.64)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
Points = np.array(Points)
n = len(Points)
if len(Points[0]) == 2:
Points = np.column_stack([Points, np.zeros(4)])
else:
Points[:, 2] = 0.0
if max_length is None:
peri = vec.perimeterFromPolyPoints(Points, closed=not grounded)
max_length = peri / (max(num_segs - 2, 1))
if n == 1:
print('build Dipole loop with direction=0 and length=1. To alter these\
values use buildDipole directly instead of buildLoop.')
return buildDipole(Points, length=1, angle=0)
elif n == 2:
return buildLine(Points[0], Points[1], num_segs=num_segs,
max_length=max_length, savename=savename,
grounded=True)
if dipole_clockwise is not None:
if not (_checkClockwise(Points) == dipole_clockwise):
Points = np.concatenate((Points[np.newaxis, 0], Points[1:][::-1]))
Coordinates = []
for i in range(0, n - grounded):
cii = Points[(i + 1) % n]
ci = Points[i]
if max_length is not None:
diff = ci - cii
num_segs = int((np.sqrt(diff[0]**2 + diff[1]**2 + diff[2]**2)) /
max_length) + 1
if i == 0:
Coordinates.extend(_3dlinspace(Points[0], Points[1], num_segs + 1))
elif i == n - 1:
Coordinates.extend(_3dlinspace(ci, cii, num_segs + 1)[1:-1])
else:
Coordinates.extend(_3dlinspace(ci, cii, num_segs + 1)[1:])
if ltype is None:
if grounded is True:
ltype = 'line'
else:
ltype = 'arbitrary'
loop = Loop(
computeLoopPositions(Coordinates, ltype=ltype, grounded=grounded),
**kwargs)
if not grounded:
loop.turns = turns
loop.area = vec.areaFromPolyPoints(Coordinates)
if savename is not None:
loop.save(savename)
return loop
[docs]def buildLine(Start, End, num_segs=0, max_length=None, savename=None,
grounded=True, **kwargs):
"""
Parameters
----------
Start: list
2D or 3D coordinate of start of the line
(z value will be ignored for now).
Start: list
2D or 3D coordinate of end of the line
(z value will be ignored for now).
num_segs: integer [ 0 ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
savename: string [ None ]
Basename for the loop.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
turns: integer [ 1 ]
Number of turns of a closed loop.
kwargs: dict
Keyword arguments are redirected to the loop class.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> l = ph.loop.buildLine([-3, -3], [4, 2], max_length=0.14)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
Coordinates = []
Start = np.array(Start)
if len(Start) == 2:
Start = np.append(Start, 0.0)
else:
Start[2] = 0.0
End = np.array(End)
if len(End) == 2:
End = np.append(End, 0.0)
else:
End[2] = 0.0
if max_length is not None:
diff = Start - End
num_segs = int((np.sqrt(diff[0]**2 + diff[1]**2 + diff[2]**2)) /
max_length) + 1
Coordinates.extend(_3dlinspace(Start, End, num_segs + 1))
outl = Loop(computeLoopPositions(Coordinates, ltype='line',
grounded=grounded),
**kwargs)
if savename is not None:
outl.save(savename)
return outl
[docs]def buildDipole(Pos, length=1, angle=0, **kwargs):
"""
Parameters
----------
Pos: list
2D or 3D coordinate of the dipole.
length: float [ 1 ]
Dipole legth.
angle: [ 0 ]
Dipole direction positive clockwise from the x-aixs.
kwargs: dict
Keyword arguments are redirected to the loop class.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> l = ph.loop.buildDipole([-3, -3], length=1.3, angle=45)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
if len(Pos) == 2:
Pos = np.append(Pos, 0.0)
else:
Pos = np.array(Pos)
Pos[2] = 0.0
return Loop([np.array((Pos,), dtype=float),
np.array((angle,), dtype=float),
np.array((length,), dtype=float),
True, 'dipole'], **kwargs)
[docs]def buildDummy(**kwargs):
""" Creates an empty dummy loop class to gain access to certain
functionalities.
"""
return Loop(['Dummy', 'Dummy', 'Dummy',
True, 'Dummy'], **kwargs)
[docs]def buildCircle(r, num_segs=None, max_length=None, P=(0, 0, 0),
dipole_clockwise=True, savename=None, turns=1, **kwargs):
"""
this function builds a n-segmented coil in the x-y-plane around the
point P = (X, Y, Z), with Radius r.
The first point is at the hightest y value with x = X and therefore the
point where the dipole is x-directed. Its the point were the field can
calculated directly without rotation, but with translation.
The rest of the coil is build clockwise.
Parameters
----------
r: float
Radius of the loop.
num_segs: integer [ None ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
P: list or np.ndarray
2D or 3D coordinate of the mid point of the loop.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
savename: string [ None ]
Basename for the loop.
turns: integer [ 1 ]
Number of turns of a closed loop.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> l = ph.loop.buildCircle(10, 11)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
if len(P) == 2:
P = np.append(P, 0.0)
else:
P = np.array(P)
P[2] = 0.0
if max_length is not None:
circumference = r * np.pi * 2
num_segs = int(circumference / max_length) + 1
num_segs = max(num_segs, 8)
x = []
y = []
for nr in range(num_segs):
tau = 2 * np.pi * nr/num_segs
x.append(P[0] + r * np.cos(tau))
y.append(P[1] + r * np.sin(tau))
drop_tol = kwargs.pop('drop_tol', 1e-14)
z = np.ones(num_segs) * P[2]
z[np.where(abs(z) < drop_tol)] = 0
x = np.asarray(x)
x[np.where(abs(x) < drop_tol)] = 0
y = np.asarray(y)
y[np.where(abs(y) < drop_tol)] = 0
coords = np.array((x, y, z), np.float64).T
loop = Loop(computeLoopPositions(coords, ltype='circle', middle=P,
grounded=False),
**kwargs)
if not dipole_clockwise:
loop.phi = (loop.phi + np.pi) % (2 * np.pi)
if savename is not None:
loop.save(savename)
loop.turns = turns
loop.area = (r**2) * np.pi
return loop
[docs]def buildSpiral(r1, r2, sp_turns=2, sp_segs=36, max_length=None, P=(0, 0, 0),
dipole_clockwise=True, savename=None, theta=90.0, **kwargs):
"""
this function builds a spiral coil with sp_turns number of turns in the
x-y-plane around the point P = (X, Y, Z), with inner radius r1 and outer
radius r2. The angle theta defines the start of the spiral
(theta = 0 -> East, theta = 90 -> North, etc.). The spiral is build
clockwise from r1 to r2.
Parameters
----------
r1: float
Inner radius of the spiral.
r2: float
Outer radius of the spiral.
sp_turns: integer [ None ]
Total number of spiral turns.
sp_segs: integer [ None ]
Total number of segments to be used to discretize the spiral.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop.
P: list or np.ndarray
2D or 3D coordinate of the mid point of the loop.
dipole_clockwise: [ True ]
Define the dipole to be ordered in a clockwise direction.
savename: string [ None ]
Basename for the loop.
theta: float, deg [ 90.0 ]
Orientation of start-end-connection of the spiral in degrees.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> l = ph.loop.buildSpiral(1, 2, sp_turns=5)
>>> print(l)
>>> print(l.config)
>>> l.show()
"""
# 2D or 3D center point
if len(P) == 2:
P = np.append(P, 0.0)
else:
P = np.array(P)
P[2] = 0.0
# mean spiral radius
R = (r1 + r2) / 2
# vector of spiral angles
phi = np.linspace(0, 2 * np.pi * sp_turns,
sp_segs * sp_turns + 1) + np.pi / 2 - np.deg2rad(theta)
# dr increment
dr = (r2 - r1) / phi.size
# linearly increasing radius
r = np.arange(r1, r2, dr)
# corner points of spiral
x = []
y = []
for n in range(len(r)):
x.append(P[0] + r[n] * np.sin(phi[n]))
y.append(P[1] + r[n] * np.cos(phi[n]))
# copied form Nico (removing small numbers)
drop_tol = kwargs.pop('drop_tol', 1e-14)
z = np.ones(len(r)) * P[2]
z[np.where(abs(z) < drop_tol)] = 0
x = np.asarray(x)
x[np.where(abs(x) < drop_tol)] = 0
y = np.asarray(y)
y[np.where(abs(y) < drop_tol)] = 0
coords = np.array((x, y, z), np.float64).T
# assemble loop as 'arbitrary' set of points
loop = buildLoop(coords, num_segs=1, max_length=max_length,
savename=savename,
grounded=False, ltype='arbitrary',
dipole_clockwise=dipole_clockwise,
**kwargs)
# there is effectively only 1 turn (check usage!)
loop.turns = 1
# average area of the loop (check usage!)
loop.area = (R**2) * np.pi
return loop
[docs]def buildFig8Circle(r, num_segs=None, max_length=None, P=(0, 0, 0),
savename=None, turns=1, **kwargs):
"""
Build a figure-of-eight loop with circular loops around point
P = (X, Y, Z), with Radius r.
Parameters
----------
r: float
Radius of the loop.
num_segs: integer [ None ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length. In this case divided between the two circular
loops.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
P: array_like [ (0., 0., 0.) ]
2D or coordinate of the mid point of the loop.
savename: string [ None ]
Basename for the loop.
turns: integer [ 1 ]
Number of turns of a closed loop.
Returns
-------
pyhed.loop.Loop:
Pyhed loop class instance.
"""
if num_segs is None:
n_segs2 = None
else:
n_segs2 = int(num_segs/2.)
circ1 = buildCircle(r, num_segs=n_segs2, P=(P[0] - r, P[1], P[2]),
max_length=max_length, savename=None,
dipole_clockwise=True, **kwargs)
circ2 = buildCircle(r, num_segs=n_segs2, P=(P[0] + r, P[1], P[2]),
max_length=max_length, savename=None,
dipole_clockwise=False, **kwargs)
fig8 = mergeLoops([circ1, circ2], true_merge=True,
take_single_min_dis=True)
fig8.turns = turns
fig8.area = circ1.area + circ2.area
return fig8
[docs]def buildFig8(points, num_segs=3, max_length=None, mid=0,
dipole_clockwise=True, turns=1, **kwargs):
"""
Builds a figure-of-eight Loop with respect to the given corner Points.
This function is part of the pyhed.loops library and returns a
'loop'-class object suitable to calculate electric and magnetic fields
based on horizontal electric dipoles. Please always check the loop
consistency with the buildin .show() command (see Example) before
calculating with experimetal loop layouts.
Parameters
----------
num_segs: integer [ 12 ]
Total number of segments to be used to discretize the Loop. Used
internally to define the max_length of a dipole. Inferior usage
compared to max_length.
max_length: float [ None ]
Defines the minimum length of a dipole used for the discretization of
the loop. Superior usage compared to num_segs.
points: array_like
Points can be of shape (2, 3), two points with three coordinates
(x, y, z) and the algorithm will build a loop with edges parallel to
the coordiante axes.
Although the point coordiantes are given with z values, the current
implementation of COMET is not able to allow for any z-values other
than zero, i appologize for the inconvenience. This flaw will be
adressed as soon as COMET moves towards 2D resistivity structures.
max_length: float or None (None)
The discretisation between the cornerpoints of the loop can be sampled
by any rate (m) you choose. Since the total number of points between
the corner points of the loop has to be an integer, the real distance
between the points will always be smaller or equal to max_length.
Its highly recommended to use at least 10 dipoles
between the different points, and therefore a total number of dipoles
of >= 50 - 60. This leads to a natural max_length of 1/10 the smaller
edge of the figure-of-eight loop.
num_segs: integer (3)
The number of segments between the corner points of the loop can also
be given directly, but mention that the max_length value (not None)
will have priority. Usually max_length will lead to more homogeneous
distributions of dipoles between the corner and midpoints of a
figure-of-eight loop.
mid: integer (0)
The middle connection depends on the value "mid". The default value
(0) sets the middle lines parrallel to the y-axis. Other values are
setting the line parrallel to the x axis, respectively.
dipole_clockwise: boolean (True)
The dipoles are orientated clockwise with respect to the first half of
the loop or counterclockwise if this switch is set the False. The first
half is considered to be the half connected to the upper left point
of the loop boundary, so either the left or the upper loop depending on
mid.
Possible kwargs are: savename. Please see "buildLoop" for more
details.
Returns
-------
pyhed.loop:
Pyhed loop class instance.
Example
-------
>>> import pyhed as ph # this works if pyhed is in your path.
>>> l = ph.loop.buildSquare(k=3.25, max_length=0.14)
Example
-------
>>> import pyhed as ph
>>> p1 = (-1, 1, 0)
>>> p2 = (1, -1, 0)
>>> fig8 = ph.loop.buildfig8((p1, p2), max_length=0.1)
>>> print(fig8)
>>> print(fig8.config)
>>> fig8.show()
"""
if len(points) == 2:
# assume corner points
# np.copy to prevent: TypeError: 'tuple' object does not support item
# assignment
# TODO! allow for z-values different than zero
# for the time being the resulting positions can be shifted manually
p1 = np.copy(points[0])
if len(p1) == 2:
p1 = np.append(p1, 0)
else:
p1[2] = 0.
p2 = np.copy(points[1])
if len(p2) == 2:
p2 = np.append(p2, 0)
else:
p2[2] = 0.
area = vec.areaFromPolyPoints(points)
assert not np.any(np.isclose(p1[0:2], p2[0:2])), '"buildFig8" needs\
two opposing corner points to work. Abort.'
if mid == 0:
cw = bool(p1[1] > p2[1])
half = float(p1[0] + p2[0])/2.
points = np.array((p1,
(half, p1[1], 0.),
(half, p2[1], 0.),
p2,
(p2[0], p1[1], 0.),
(half, p1[1], 0.),
(half, p2[1], 0.),
(p1[0], p2[1], 0.)))
else:
cw = bool(p1[0] < p2[0])
half = float(p1[1] + p2[1])/2.
points = np.array((p1,
(p2[0], p1[1], 0.),
(p2[0], half, 0.),
(p1[0], half, 0.),
(p1[0], p2[1], 0.),
p2,
(p2[0], half, 0.),
(p1[0], half, 0.)))
if cw is not dipole_clockwise:
# change the direction of the dipoles with
# respect to "dipole_clockwise"
points = np.concatenate((points[np.newaxis, 0], points[1:][::-1]))
loop = buildLoop(points, num_segs=num_segs, max_length=max_length,
grounded=False, ltype='figure8', **kwargs)
loop.turns = turns
loop.area = area
return loop
[docs]def buildEtraSourceDiscretization(poly, edgelength, max_length=0.251,
x_left=None, n_segs=None, number_of_loops=8):
""" Internal function.
Implements an etra shaped source in the FEM mesh.
Cannot use buildEdgeSourceDiscretization due to overlapping edges.
"""
assert number_of_loops > 1, 'number of loops <= 1'
offsets = number_of_loops + 2
norm_off = (number_of_loops + 1)/2.
offset = np.linspace(-norm_off/2 * edgelength,
norm_off/2 * edgelength,
offsets)
if x_left is not None:
offset -= (offset[0] - x_left)
lines = []
if n_segs is None:
n_segs = int(edgelength // max_length)
n_segs2 = int((edgelength / 2) // max_length)
else:
n_segs2 = int(n_segs / 2)
y_pos = np.linspace(-edgelength * 0.5, edgelength * 0.5, n_segs)
for xi, x_off in enumerate(offset):
# y lines
x_pos = np.ones_like(y_pos) * x_off
pos = np.column_stack([x_pos, y_pos, np.zeros_like(x_pos)])
lines.append(pos)
if xi > 0:
# x lines
x_pos2 = np.linspace(offset[xi - 1], offset[xi], n_segs2)
y_pos_up = np.ones_like(x_pos2) * 0.5 * edgelength
y_pos_down = np.ones_like(x_pos2) * (-edgelength * 0.5)
lines.append(np.column_stack([x_pos2, y_pos_up,
np.zeros_like(x_pos2)]))
lines.append(np.column_stack([x_pos2, y_pos_down,
np.zeros_like(x_pos2)]))
if isinstance(poly, pg.Mesh):
# case 1/2: no custEM
dim_poly = poly.dim()
if dim_poly == 2:
for j in range(len(lines)):
nn = [poly.createNodeWithCheck([o[0], o[1]]) for
o in lines[j]]
for k in range(len(lines[j]) - 1):
poly.createEdge(nn[k], nn[k + 1], marker=47)
else:
for j in range(len(lines)):
nn = [poly.createNodeWithCheck([o[0], o[1], 0.0]) for
o in lines[j]]
for k in range(len(lines[j]) - 1):
poly.createEdge(nn[k], nn[k + 1], marker=47)
else:
# case 2/2: no custEM
poly.build_surface(insert_lines=[*lines],
line_marker=[47] * len(lines))
[docs]def buildEdgeSourceDiscretization(surface, pos, phi, ds, closed=True):
""" Internal function.
Used to implement every dipole in the FEM mesh using
an appropriate edge that represents it.
"""
pos = np.atleast_1d(pos)
ds = np.atleast_1d(ds)
phi = np.atleast_1d(phi)
pos1 = pos + np.array([np.cos(phi)*ds/2,
np.sin(phi)*ds/2,
np.zeros(len(phi))]).T
if not closed:
# first point is different from last point
# add first point explicitly
pos_start = pos[0] - np.array([np.sin(phi[0])*ds[0]/2,
np.cos(phi[0])*ds[0]/2,
0.0]).T
pos1 = np.append(pos_start, pos1)
nodes = []
for pi, posi in enumerate(pos1):
nodes.append(surface.createNodeWithCheck(posi))
for ni in range(len(nodes) - 1):
surface.createEdge(nodes[ni], nodes[ni + 1], marker=47)
if closed:
surface.createEdge(nodes[-1], nodes[0], marker=47)
[docs]def buildPointSourceDiscretization(surface, pos):
""" Internal function.
Used to implement each source dipole as simple node in the FEM mesh.
"""
pos = np.atleast_1d(pos)
for pi in pos:
surface.createNodeWithCheck(pi)
[docs]def buildEtraSurvey(edgelength, return_measurements=False,
origin=[0, 0], num_loops=8, max_length=None,
savenames=None, **kwargs):
""" Special etra survey for NMR applications. """
if num_loops <= 2:
raise Exception('buildEtraSurvey: num_loops has to be > 2.')
if savenames is not None:
if not isinstance(savenames, str):
raise Exception('"savenames" has to be string not {}.'
.format(type(savenames)))
if '{}' not in savenames:
savenames = savenames + '_{}'
loops = []
corner_x = np.linspace(-0.25 * num_loops * edgelength,
0.25 * num_loops * edgelength, num_loops + 1)
c_idx_x = []
c_idx_x.append([1, num_loops - 1])
for i in range(num_loops - 1):
c_idx_x.append([i, i + 2])
if max_length is None:
max_length = edgelength/25.
for (ind_1, ind_2) in c_idx_x:
loops.append(buildRectangle(
[[corner_x[ind_1] + origin[0], -edgelength/2 + origin[1], 0],
[corner_x[ind_2] + origin[0], edgelength/2 + origin[1], 0]],
max_length=max_length,
dipole_clockwise=kwargs.pop('dipole_clockwise', True), **kwargs))
for lp, loop in enumerate(loops):
if savenames:
loop.name = savenames.format(lp)
else:
loop.name = f'loop_{lp}'
if return_measurements:
ret = (loops, np.array([np.zeros(num_loops, dtype=int),
np.arange(0, num_loops, dtype=int),
np.zeros(num_loops, dtype=int)]).T,)
else:
ret = loops
return ret
[docs]def loadLoop(name, **kwargs):
""" Imports loop from file archive. See *ph.loop.Loop.load* for details.
"""
ll = buildDummy()
ll.load(name, **kwargs)
return ll
[docs]def loadLoops(name, num=8, load_meshes=False, cfg_name=None, cfg2_name=None,
overwrite_dir=False):
""" Loads n loops with name = ...{n}... """
loops = []
kwargs = {}
kwargs['overwrite_dir'] = overwrite_dir
kwargs['load_meshes'] = load_meshes
for i in range(num):
if cfg_name:
kwargs['config'] = cfg_name.format(i)
if cfg2_name:
kwargs['config2'] = cfg2_name.format(i)
loops.append(loadLoop(name.format(i), **kwargs))
return loops
[docs]def dipolePosFromSimpleLoop(r, n, P=(0, 0, 0), drop_tol=1e-14):
""" Convienience function to have fast access to circular loop coordinates.
"""
Loop = buildCircle(r, n, P=P, drop_tol=drop_tol)
posi = Loop.pos
dipole_pos = np.array((posi[0], posi[1], posi[2]))
return dipole_pos.T
[docs]def mergeLoops(*loops, true_merge=False, config=None):
""" Merges given loops to one and optionally merges equal dipoles.
Parameters
----------
loops: loops-classes
Loops to be merged.
true_merge: boolean [True]
Switch for a complete merge of all dipoles to with respect to
their phi and dipole length. Attention this can be problematic when
merging edge-to-edge loops (for mesh creation for example).
If False only the dipole positions are merged not phi/ds (there
replaced with dummy values).
config: ph.config-instance or string [None]
Sets config of merged loop either per index (cofig is taken from
the corresponding loop) or give a new config. If None the config
of the first loop is used instead.
"""
if isinstance(loops[0], (list, tuple, np.ndarray)):
loops = loops[0]
pos = np.concatenate(([loop.pos for loop in loops]), axis=0)
if config is None:
config = loops[0].config
elif isinstance(config, int):
config = loops[config].config
if true_merge:
# case 1/2: fast merge of all dipoles, phi, and ds.
phi = np.concatenate(([loop.phi for loop in loops]), axis=0)
ds = np.concatenate(([loop.ds for loop in loops]), axis=0)
merged = Loop((pos, phi, ds, False, 'arbitrary'))
else:
# case 2/2: fast merge of all dipoles, not phi/ds.
# use this for mesh creation
pos = pg.utils.unique_rows(pos)
merged = Loop((pos,
np.ones(len(pos)), # fake phi
np.ones(len(pos)), # fake ds
False, # grounded = False
'arbitrary'
))
# take min distance from single loops
ds = np.concatenate(([loop.ds for loop in loops]))
merged._mindistance = np.min(ds)
return merged
[docs]def createSeparatedLoopMesh(*loops, dipole_mesh=False, **kwargs):
""" Build a mesh suited for EM primary field calculation if using more than
one loop.
"""
# gather all dipoles for discretization of mesh
if isinstance(loops[0], (list, tuple, np.ndarray)):
loops = loops[0]
pos = np.concatenate(([loop.pos for loop in loops]), axis=0)
pos = pg.utils.unique_rows(pos)
# dummy loop
hlp = Loop((pos,
np.ones(len(pos)), # fake phi
np.ones(len(pos)), # fake ds
False, # grounded = False
'arbitrary'
))
hlp.setMeshParameters(refinement_para=kwargs.pop('refinement_para', 1.0),
max_area_factor=kwargs.pop('max_area_factor', 1.0),
tetgen_quality=kwargs.pop('tetgen_quality', 1.2))
loop_mesh = hlp.createLoopMesh()
if dipole_mesh:
dipole_mesh = hlp.createDipoleMesh()
return loop_mesh, dipole_mesh
else:
return loop_mesh
[docs]def createSeparatedFEMMesh(*loops, para_mesh_2d=None, **kwargs):
""" Build a mesh suited for EM secondary field calculation if more than one
loop is used.
"""
# gather all dipoles for discretization of mesh
hlp = mergeLoops(*loops)
hlp.setMeshParameters(refinement_para=kwargs.pop('refinement_para', 1.0),
max_area_factor=kwargs.pop('max_area_factor', 1.0),
tetgen_quality=kwargs.pop('tetgen_quality', 1.2))
loop_mesh = hlp.createFEMMesh(para_mesh_2d=para_mesh_2d, **kwargs)
return loop_mesh
[docs]def createEtraMesh(loops, mesh2d, anomaly, savename=None, extend_x=0.,
extend_y=-0.3, extend_z=-0.5, max_volume=25.,
append_boundary=True, sort=True, return_loop=False):
""" Creates a finite element mesh suited for ETRA surveys.
"""
mesh2d = pg.Mesh(mesh2d)
mesh_loop = mergeLoops(loops, true_merge=False)
mesh_loop.setParaMesh2D(mesh2d, append_boundary=append_boundary,
anomaly=anomaly, sort=sort)
mesh_loop.createFEMMesh(savename=savename,
extend_x=extend_x,
extend_y=extend_y,
extend_z=extend_z,
maxVolume=max_volume,
source_setup='etra')
if return_loop:
return mesh_loop
else:
return mesh_loop.loopmesh, mesh_loop.valid_marker
[docs]def buildEtraPoly(x_min, x_max, small, marker=0):
assert int(x_max - x_min) % small == 0, 'No Etra fits in there.'
poly = pg.Mesh(2)
n = 0
for x in np.arange(x_min, x_max + small/2, small/2):
for yi, y in enumerate([-small/2, small/2]):
poly.createNodeWithCheck([x, y])
if yi == 1:
# y directed bounds
poly.createBoundary([n-1, n], marker=marker)
if n >= 3:
# x directed bounds, upper
poly.createBoundary([n-2, n], marker=marker)
# -"- lower
poly.createBoundary([n-1, n-3], marker=marker)
n += 1
return poly
[docs]def createMultipleLoopMesh(
loops, savename=None, source_setup='etra', triangle_quality=33.8,
source_max_area=None, inner_area_volume=None, mid_area_volume=None,
outer_area_volume=None, minx=None, maxx=None, miny=None, maxy=None,
minz=None, air_refinement=False, source_poly=None):
"""
Build a suited mesh for magnetic field calculation for NMR purpose.
Sources are included in a way defined by **source_setup**.
Parameters
----------
loops: ph.loop.loop or array_like
Input loops for which the mesh shall be created.
savename: string [ None ]
Savename for mesh (.bms will be added).
source_setup: string [ 'etra' ]
In case of multiple loops, the source_setup is important to define how
the loops are included in the mesh. Default is 'etra' for NMR Etra
setups. Alternatively 'edges' can be used to implement each dipole as
edge with the dipole as midpoint, length as well as direction is
defined by the dipole. This is not working for overlaping loops
(e.g. Etras). For those a source_poly can be provided or 'nodes' is
chosen to simply implement each dipole as node in the mesh.
triangle_quality: float [ 34.0 ]
The surface where the sources are implemented is meshed in 2D an then
later inserted in the 3D mesh. This controls the triangle quality for
this surface mesh.
source_max_area:
maximum area allowed in the 2D surface mesh (sources and 5 meter around
the sources). Automatically defined if None (based on size of the area
to ensure a minimum amount of trinagle cells). 2D surface mesh is
exported if logger is set to debug level (10).
inner_area_volume: float [ None ]
The inner refinement volume is defined 5 meter around and below the
sources. The maximum cell volume can be defined here. Automatically
defined if None (based on size of the volume to ensure a minimum amount
of cells).
mid_area_volume: float [ None ]
The median refinement volume is defined through minx, maxx, miny, maxy,
and minz meter around and below the sources. The maximum cell volume
can be defined here. Automatically defined if None (based on size of
the volume to ensure a minimum amount of cells).
outer_area_volume: float [ None ]
The outer sides of the mesh (3 times miny, maxx, miny, maxy, minz) is
to ensure interpolation of the field values are secured without the
need for extrapolation. Usually the max cell volume is not contraint
to minimize the computational effort.
minx: float [ None ]
Minimum x extention (in addition to the extend of the inner refinement
area) of median refinement volume. If None this value is defined as
maximum distance of two dipoles of the input loops.
maxx: float [ None ]
Maximum x extention (in addition to the extend of the inner refinement
area) of median refinement volume. If None this value is defined as
maximum distance of two dipoles of the input loops.
miny: float [ None ]
Minimum y extention (in addition to the extend of the inner refinement
area) of median refinement volume. If None this value is defined as
maximum distance of two dipoles of the input loops.
maxy: float [ None ]
Maximum y extention (in addition to the extend of the inner refinement
area) of median refinement volume. If None this value is defined as
maximum distance of two dipoles of the input loops.
minz: float [ None ]
Maximum z extention (in addition to the extend of the inner refinement
area) of median refinement volume. If None this value is defined as
maximum distance of two dipoles of the input loops. 1/3 of the value is
used for maxz if air refinement is enabled.
air_refinement: boolean [ False ]
If true the airspace is meshed as well.
source_poly: pg.Mesh or plc [None]
For unusual or overlapping, non etra, sources a piecewise linear
complex (plc) can be created using the pygimli mesh- and polytools.
This is then used as source definition for the 2D source layer mesh.
Any region markers with area constraints will be considered, no
additional markers will be set.
"""
# % input: a bunch of loops
# Step 1: merge and define some input stuff
loops = np.atleast_1d(loops)
if len(loops) > 1:
merged = mergeLoops(loops, true_merge=False)
else:
merged = loops[0]
# create proper file path for saving stuff
merged.setLoopMeshName(savename)
mesh_path = Path(merged.loop_mesh_name)
mesh_path.parent.mkdir(exist_ok=True, parents=True)
log.debug([mesh_path.as_posix(), mesh_path.parent.as_posix()])
# without suffix
mesh_base = mesh_path.with_name(mesh_path.stem).as_posix()
log.info('creating Loop mesh...')
# % Step 2: limits
# compute outer limits for FEM mesh
if minx is None:
minx = np.min(merged.pos.T[0]) - merged._maxdistance
if maxx is None:
maxx = np.max(merged.pos.T[0]) + merged._maxdistance
if miny is None:
miny = np.min(merged.pos.T[1]) - merged._maxdistance
if maxy is None:
maxy = np.max(merged.pos.T[1]) + merged._maxdistance
if minz is None:
minz = np.min(merged.pos.T[2]) - merged._maxdistance
if air_refinement:
maxz = 0.33 * abs(minz)
else:
maxz = 0.0
if mid_area_volume is None:
# want to have at least roughly 35000 cells in mid area:
mid_area_volume = (maxx - minx) * (maxy - miny) * (maxz - minz) / 35000
log.log(13, 'inner mesh x:, {}, {}'.format(minx, maxx))
log.log(13, 'inner mesh y:, {}, {}'.format(miny, maxy))
log.log(13, 'inner mesh z:, {}, {}'.format(minz, maxz))
# % Step 4: implement sources
if source_poly is None:
source_poly = pg.Mesh(2)
# case 1/3: etra loops are special as they have overlapping edges
# which are not easy to implement
if source_setup.lower() == 'etra':
number_of_loops = len(loops)
# number of segments a single edge is divided in
ay = np.max(merged.pos[:, 1]) - np.min(merged.pos[:, 1])
# overrides poly, cannot do this another way
buildEtraSourceDiscretization(
source_poly, ay, max_length=merged._mindistance,
number_of_loops=number_of_loops)
# case 2/3: source is inserted as edges, the dipoles are in the middle
# of a edge with length if the dipole. Do not use for very corse dipole
# discretizations.
elif source_setup.lower() == 'edges':
buildEdgeSourceDiscretization(
source_poly, merged.pos, merged.phi,
merged.ds, closed=not merged.grounded)
# case 3/3: dipoles are added as dipoles. Just as nodes, thats it.
elif source_setup.lower() == 'nodes':
buildPointSourceDiscretization(source_poly, merged.pos)
# no valid case: raise Exception
else:
raise Exception('Source setup "{}", not in ["etra", "nodes", '
'"edges"].'.format(source_setup))
else: # source poly is not None
if isinstance(source_poly, str):
source_poly = pg.load(source_poly)
# adding outer boundary
# 5 * 5 * 5 meter case around sources
ibox_x = (source_poly.xmin() - 5, source_poly.xmax() + 5)
ibox_y = (source_poly.ymin() - 5, source_poly.ymax() + 5)
if inner_area_volume is None:
# if not specified, try to get at least 15,000 cells in inner area
inner_area_volume = (ibox_x[0] - ibox_x[1]) *\
(ibox_y[0] - ibox_y[1]) *\
5 / 15000
bbox = pg.Mesh(2)
bb0 = bbox.createNode([ibox_x[0], ibox_y[0]])
bb1 = bbox.createNode([ibox_x[0], ibox_y[1]])
bb2 = bbox.createNode([ibox_x[1], ibox_y[1]])
bb3 = bbox.createNode([ibox_x[1], ibox_y[0]])
# bbox.createQuadrangleFace(bb0, bb1, bb2, bb3)
bbox.createEdge(bb0, bb1)
bbox.createEdge(bb1, bb2)
bbox.createEdge(bb2, bb3)
bbox.createEdge(bb3, bb0)
inner = pg.meshtools.polytools.mergePLC([bbox, source_poly])
if source_max_area is None:
source_max_area = (merged._mindistance ** 2) * 4
log.info('Building 2d surface mesh with quality: {:.2f}, max Area: '
'{:.2f} m².'.format(triangle_quality, source_max_area))
interface_mesh2d = pg.meshtools.createMesh(
inner, quality=triangle_quality, area=source_max_area)
log.info('2d surface mesh: {}'.format(interface_mesh2d))
if log.getEffectiveLevel() <= 10:
interface_mesh2d.exportVTK('interface_mesh_2d.vtk')
# now: cells in 2d are boundaiers in 3D, therefore we need to do this merge
# manually
interface_mesh = pg.Mesh(3)
for cell in interface_mesh2d.cells():
cids = []
for node in cell.nodes():
cids.append(interface_mesh.createNodeWithCheck(node.pos()))
interface_mesh.createTriangleFace(*cids)
pos_arr = interface_mesh.positions().array()
b_left = np.isclose(pos_arr[:, 0], interface_mesh.xmin())
b_right = np.isclose(pos_arr[:, 0], interface_mesh.xmax())
b_upper = np.isclose(pos_arr[:, 1], interface_mesh.ymax())
b_lower = np.isclose(pos_arr[:, 1], interface_mesh.ymin())
interface_mesh.setDimension(3)
# task: identify nodes at the boundary of the inner mesh to connect them
# to the outer mesh
# y positions of nodes at the left boundary of the inner mesh
y_left = np.argsort(pos_arr[b_left][:, 1])
midid_yleft = int(len(y_left)//2)
id_left = np.where(b_left)[0][y_left]
# y positions of nodes at the right boundary of the inner mesh
y_right = np.argsort(pos_arr[b_right][:, 1])
midid_yright = int(len(y_right)//2)
id_right = np.where(b_right)[0][y_right]
# x positions of nodes at the upper boundary of the inner mesh
x_upper = np.argsort(pos_arr[b_upper][:, 0])
midid_xupper = int(len(x_upper)//2)
id_upper = np.where(b_upper)[0][x_upper]
# x positions of nodes at the lower boundary of the inner mesh
x_lower = np.argsort(pos_arr[b_lower][:, 0])
midid_xlower = int(len(x_lower)//2)
id_lower = np.where(b_lower)[0][x_lower]
# sides, now we close the sides under the inner mesh connecting the nodes
# from the boundary with the corners of the lower half of the innner mesh
n4 = interface_mesh.createNodeWithCheck([ibox_x[0], ibox_y[0], -5.0])
n5 = interface_mesh.createNodeWithCheck([ibox_x[0], ibox_y[1], -5.0])
n6 = interface_mesh.createNodeWithCheck([ibox_x[1], ibox_y[1], -5.0])
n7 = interface_mesh.createNodeWithCheck([ibox_x[1], ibox_y[0], -5.0])
im_nodes = interface_mesh.nodes()
# min_x, min_y: connect to b_left, b_lower
for n in range(len(id_left[:midid_yleft])):
node_id0 = id_left[n]
node_id1 = id_left[n + 1]
interface_mesh.createTriangleFace(
n4, im_nodes[node_id0], im_nodes[node_id1])
for n in range(len(id_lower[:midid_xlower])):
node_id0 = id_lower[n]
node_id1 = id_lower[n + 1]
interface_mesh.createTriangleFace(
n4, im_nodes[node_id0], im_nodes[node_id1])
# min_x, max_y: connect to b_left, b_upper
for n in range(len(id_left[midid_yleft:-1])):
node_id0 = id_left[n + midid_yleft]
node_id1 = id_left[n + 1 + midid_yleft]
interface_mesh.createTriangleFace(
n5, im_nodes[node_id0], im_nodes[node_id1])
for n in range(len(id_upper[:midid_xupper])):
node_id0 = id_upper[n]
node_id1 = id_upper[n + 1]
interface_mesh.createTriangleFace(
n5, im_nodes[node_id0], im_nodes[node_id1])
# max_x, max_y: connect to b_right, b_upper
for n in range(len(id_right[midid_yright:-1])):
node_id0 = id_right[n + midid_yright]
node_id1 = id_right[n + 1 + midid_yright]
interface_mesh.createTriangleFace(
n6, im_nodes[node_id0], im_nodes[node_id1])
for n in range(len(id_upper[midid_xupper:-1])):
node_id0 = id_upper[n + midid_xupper]
node_id1 = id_upper[n + 1 + midid_xupper]
interface_mesh.createTriangleFace(
n6, im_nodes[node_id0], im_nodes[node_id1])
# max_x, min_y: connect to b_right, b_lower
for n in range(len(id_right[:midid_yright])):
node_id0 = id_right[n]
node_id1 = id_right[n + 1]
interface_mesh.createTriangleFace(
n7, im_nodes[node_id0], im_nodes[node_id1])
for n in range(len(id_lower[midid_xlower:-1])):
node_id0 = id_lower[n + midid_xlower]
node_id1 = id_lower[n + 1 + midid_xlower]
interface_mesh.createTriangleFace(
n7, im_nodes[node_id0], im_nodes[node_id1])
interface_mesh.createTriangleFace(n4, n5, im_nodes[id_left[midid_yleft]])
interface_mesh.createTriangleFace(n5, n6, im_nodes[id_upper[midid_xupper]])
interface_mesh.createTriangleFace(n6, n7, im_nodes[id_right[midid_yright]])
interface_mesh.createTriangleFace(n7, n4, im_nodes[id_lower[midid_xlower]])
log.log(16, 'Add volume constraint marker (inner box): {} using {:.3f} m³'
.format([np.mean(ibox_x), np.mean(ibox_y), -2.5],
inner_area_volume))
interface_mesh.addRegionMarker([np.mean(ibox_x),
np.mean(ibox_y),
-2.5], 1, area=inner_area_volume)
# bottom
interface_mesh.createQuadrangleFace(n4, n5, n6, n7)
# for bound in interface_mesh.boundaries():
# bound.setMarker(1)
# interface_mesh.exportBoundaryVTU('interface_mesh.vtu')
# interface_mesh.exportVTK('interface_mesh.vtk')
mid_poly = createPolyBoxWithHalfspace(
minx, maxx, miny, maxy, minz, maxz, halfspace_at=0.0,
without_halfspace=True, interface_marker=None)
for bound in mid_poly.boundaries():
ids = []
for node in bound.nodes():
ids.append(interface_mesh.createNodeWithCheck(node.pos()))
interface_mesh.createQuadrangleFace(*ids)
# for bound in interface_mesh.boundaries():
# bound.setMarker(1)
# interface_mesh.exportBoundaryVTU('mid_poly.vtu')
# interface_mesh.exportVTK('mid_poly.vtk')
nodes_mid_poly = mid_poly.nodes()[:4]
nodes_mid = []
for node in nodes_mid_poly:
nodes_mid.append(interface_mesh.createNodeWithCheck(node.pos()))
# sides
# print(nodes_mid[0].pos()) # minx, miny 0
# print(nodes_mid[1].pos()) # maxx, miny 3
# print(nodes_mid[2].pos()) # maxx, maxy 2
# print(nodes_mid[3].pos()) # minx, maxy 1
mid_nodes = interface_mesh.nodes()
# min_x, min_y: connect to b_left, b_lower
for n in range(len(id_left[:midid_yleft])):
node_id0 = id_left[n]
node_id1 = id_left[n + 1]
interface_mesh.createTriangleFace(
nodes_mid[0], mid_nodes[node_id0], mid_nodes[node_id1])
for n in range(len(id_lower[:midid_xlower])):
node_id0 = id_lower[n]
node_id1 = id_lower[n + 1]
interface_mesh.createTriangleFace(
nodes_mid[0], mid_nodes[node_id0], mid_nodes[node_id1])
# min_x, max_y: connect to b_left, b_upper
for n in range(len(id_left[midid_yleft:-1])):
node_id0 = id_left[n + midid_yleft]
node_id1 = id_left[n + 1 + midid_yleft]
interface_mesh.createTriangleFace(
nodes_mid[3], mid_nodes[node_id0], mid_nodes[node_id1])
for n in range(len(id_upper[:midid_xupper])):
node_id0 = id_upper[n]
node_id1 = id_upper[n + 1]
interface_mesh.createTriangleFace(
nodes_mid[3], mid_nodes[node_id0], mid_nodes[node_id1])
# max_x, max_y: connect to b_right, b_upper
for n in range(len(id_right[midid_yright:-1])):
node_id0 = id_right[n + midid_yright]
node_id1 = id_right[n + 1 + midid_yright]
interface_mesh.createTriangleFace(
nodes_mid[2], mid_nodes[node_id0], mid_nodes[node_id1])
for n in range(len(id_upper[midid_xupper:-1])):
node_id0 = id_upper[n + midid_xupper]
node_id1 = id_upper[n + 1 + midid_xupper]
interface_mesh.createTriangleFace(
nodes_mid[2], mid_nodes[node_id0], mid_nodes[node_id1])
# max_x, min_y: connect to b_right, b_lower
for n in range(len(id_right[:midid_yright])):
node_id0 = id_right[n]
node_id1 = id_right[n + 1]
interface_mesh.createTriangleFace(
nodes_mid[1], mid_nodes[node_id0], mid_nodes[node_id1])
for n in range(len(id_lower[midid_xlower:-1])):
node_id0 = id_lower[n + midid_xlower]
node_id1 = id_lower[n + 1 + midid_xlower]
interface_mesh.createTriangleFace(
nodes_mid[1], mid_nodes[node_id0], mid_nodes[node_id1])
# add missing triangles connecting the lower corners with the middle node
# of the surface mesh.
interface_mesh.createTriangleFace(nodes_mid[0], nodes_mid[3],
im_nodes[id_left[midid_yleft]])
interface_mesh.createTriangleFace(nodes_mid[3], nodes_mid[2],
im_nodes[id_upper[midid_xupper]])
interface_mesh.createTriangleFace(nodes_mid[2], nodes_mid[1],
im_nodes[id_right[midid_yright]])
interface_mesh.createTriangleFace(nodes_mid[1], nodes_mid[0],
im_nodes[id_lower[midid_xlower]])
if mid_area_volume:
log.log(16, 'Add volume constraint marker (middle box, ground): {} '
'using {:.3f} m³'
.format([np.mean(ibox_x), np.mean(ibox_y), -5.5],
mid_area_volume))
interface_mesh.addRegionMarker([np.mean(ibox_x),
np.mean(ibox_y),
-5.5], 1, area=mid_area_volume)
if air_refinement:
log.log(16, 'Add volume constraint marker (middle box, air): {} '
'using {:.3f} m³'
.format([np.mean(ibox_x), np.mean(ibox_y), 5.5],
mid_area_volume * 2))
interface_mesh.addRegionMarker([np.mean(ibox_x),
np.mean(ibox_y),
5.5], 1, area=mid_area_volume * 2)
# for bound in interface_mesh.boundaries():
# bound.setMarker(1)
# interface_mesh.exportBoundaryVTU('mid.vtu')
# interface_mesh.exportVTK('mid.vtk')
# outer boundary to make sure interpolation is possible extrapolation
# if mesh is rotated/shifted
minx_outer = np.min(merged.pos.T[0]) - 3 * merged._maxdistance
maxx_outer = np.max(merged.pos.T[0]) + 3 * merged._maxdistance
miny_outer = np.min(merged.pos.T[1]) - 3 * merged._maxdistance
maxy_outer = np.max(merged.pos.T[1]) + 3 * merged._maxdistance
minz_outer = np.min(merged.pos.T[2]) - 3 * merged._maxdistance
outer_poly = createPolyBoxWithHalfspace(
min(3 * minx, minx_outer),
max(3 * maxx, maxx_outer),
min(3 * miny, miny_outer),
max(3 * maxy, maxy_outer),
min(3 * minz, minz_outer),
3 * maxz,
halfspace_at=0.0, without_halfspace=True, interface_marker=None)
nodes_outer_poly = outer_poly.nodes()[:4]
nodes_outer = []
nodes_mid_outer = []
# merge outer poly in 3d mesh, boundariy per boundary
for bound in outer_poly.boundaries():
ids = []
for node in bound.nodes():
ids.append(interface_mesh.createNodeWithCheck(node.pos()))
interface_mesh.createQuadrangleFace(*ids)
# get 4 corner nodes in the earth/air inteface to connect them
for node in nodes_outer_poly:
nodes_outer.append(interface_mesh.createNodeWithCheck(node.pos()))
# gather 4 corner nodes of the midle mesh
for node in nodes_mid:
nodes_mid_outer.append(interface_mesh.createNodeWithCheck(node.pos()))
# finally connect middle mesh with outer mesh in the x-y-plane at z=0
interface_mesh.createQuadrangleFace(nodes_outer[0], nodes_outer[1],
nodes_mid_outer[1], nodes_mid_outer[0])
interface_mesh.createQuadrangleFace(nodes_outer[1], nodes_outer[2],
nodes_mid_outer[2], nodes_mid_outer[1])
interface_mesh.createQuadrangleFace(nodes_outer[2], nodes_outer[3],
nodes_mid_outer[3], nodes_mid_outer[2])
interface_mesh.createQuadrangleFace(nodes_outer[3], nodes_outer[0],
nodes_mid_outer[0], nodes_mid_outer[3])
# add volume marker
if outer_area_volume is not None:
log.log(16, 'Add volume constraint marker (outer box, ground): {} '
'using {:.3f} m³'
.format([np.mean(ibox_x), np.mean(ibox_y), 2 * minz],
outer_area_volume))
interface_mesh.addRegionMarker([np.mean(ibox_x),
np.mean(ibox_y),
2 * minz], 3,
area=outer_area_volume)
# add volume marker for air
if air_refinement and outer_area_volume is not None:
log.log(16, 'Add volume constraint marker (outer box, air): {} '
'using {:.3f} m³'
.format([np.mean(ibox_x), np.mean(ibox_y), 2./3 * minz],
outer_area_volume))
interface_mesh.addRegionMarker([np.mean(ibox_x),
np.mean(ibox_y),
2./3 * minz], 3,
area=outer_area_volume)
# for bound in interface_mesh.boundaries():
# bound.setMarker(1)
# interface_mesh.exportBoundaryVTU('outer.vtu')
# interface_mesh.exportVTK('outer.vtk')
# % meshing, export plc first
plcname = mesh_path.with_suffix('.poly')
pg.meshtools.exportPLC(interface_mesh, plcname.as_posix())
interface_mesh.exportVTK(
mesh_path.with_name('{}.vtk'.format(mesh_path.stem)).as_posix())
# tetgen call
tetgen151(mesh_base, quality=merged.tetgen_quality)
# import mesh
log.info(f'load "{plcname}.1"...')
try:
loopmesh = pg.Mesh(plcname.with_suffix('.1.vtk').as_posix())
except RuntimeError: # c++ for file not found in this case
loopmesh = pg.meshtools.readTetgen(
plcname.with_suffix('.1').as_posix())
# save mesh in pygimli syntax == .bms file
log.info(loopmesh)
loopmesh.save(mesh_path.as_posix()) # for kernel call (piping)
log.info('loopmesh.save: {}'.format(mesh_path.as_posix()))
# clean up residual files
cleanUpTetgenFiles(mesh_path.as_posix())
return loopmesh
[docs]def copyPrimaryFields(rdir1, rdir2):
"""
For recalculation purpose it sometimes is uneccessary to calc the
primary fields again. This function copies the primery fields from on
custEM result dir (rdir1) to another (rdir2). names of the fields are not
changed. Only the .h5 files are copied.
"""
import shutil
# copy previously calculated primary fields
pf_path = Path(f'{rdir1}/primary_fields')
pf_path2 = Path(f'{rdir2}/primary_fields')
log.info('copyPrimaryFields: Copy primary fields from "{}" to "{}"'
.format(rdir1, rdir2))
found = 0
for ni, file in enumerate(pf_path.glob('*/*.h5')):
found += 1
old_file = file
new_file = pf_path2.joinpath(file.parents[0].name, file.name)
new_file.parent.mkdir(exist_ok=True, parents=True)
if new_file.exists():
log.debug('found: {}'.format(new_file.as_posix()))
pass
else:
log.debug('write: {}'.format(new_file.as_posix()))
shutil.copy(old_file.as_posix(),
new_file.as_posix())
if found == 0:
raise Exception('No primary fields found in rdir1 : "{}"'
.format(rdir1))
[docs]def totalFieldCalculation(custem_config, num_cpu=16):
success = local_apps_bash('totalFieldCalculation',
custem_config,
str(num_cpu))
log.info('success: {}'.format(success))
if success != 0:
log.error('totalFieldCalculation aborted.')
raise SystemExit()
[docs]def calcWithEmpymod(loop, use_bipole=False):
from empymod import bipole
import scipy
loglvl = log.getEffectiveLevel() // 10
verb = max((min(4 - loglvl, 4)), 0)
kwargs = {'verb': verb}
str_factor = 1
if loop.config.ftype.lower() == 'h':
kwargs['mrec'] = True
str_factor *= -1 # definition
elif loop.config.ftype.lower() == 'b':
kwargs['mrec'] = True
str_factor *= -1 # definition
str_factor *= np.pi * 4e-7
elif loop.config.ftype.lower() == 'e':
kwargs['mrec'] = False
if use_bipole:
if loop.type != 'line':
raise Exception(
'use_bipole=True is only working for line sources.')
kwargs['srcpts'] = len(loop.ds)
outpos = loop.loopmesh.positions().array()
field = np.zeros([3, len(outpos)], dtype=complex)
pos_flipped = np.copy(outpos)
pos_flipped[:, 2] *= -1
if not use_bipole:
z_value = -loop.pos[0, 2]
if np.isclose(z_value, 0):
z_value += 0.01
src = [loop.pos[:, 0], loop.pos[:, 1],
z_value,
np.rad2deg(loop.phi),
np.zeros(len(loop.pos))]
else:
direction = loop.pos[-1] - loop.pos[0]
direction /= scipy.linalg.norm(direction)
start = loop.pos[0] - (loop.ds[0]/2 * direction)
end = loop.pos[-1] + (loop.ds[-1]/2 * direction)
z_start = start[2]
z_end = end[2]
if np.isclose(z_start, 0):
z_start += 0.01
if np.isclose(z_end, 0):
z_end += 0.01
src = [start[0], end[0], start[1], end[1], z_start, z_end]
depth = [0]
depth.extend(np.cumsum(loop.config.d))
res = [1e12, *loop.config.rho]
freq = loop.config.f
for ri, rpos in enumerate(pos_flipped):
recx = [*rpos, 0, 0]
recy = [*rpos, 90, 0]
recz = [*rpos, 0, -90]
argsx = [src, recx, depth, res, freq]
argsy = [src, recy, depth, res, freq]
argsz = [src, recz, depth, res, freq]
if use_bipole:
field[0, ri] = bipole(*argsx, **kwargs) * np.sum(loop.ds)
field[1, ri] = bipole(*argsy, **kwargs) * np.sum(loop.ds)
field[2, ri] = bipole(*argsz, **kwargs) * np.sum(loop.ds)
else:
field[0, ri] = np.sum(bipole(*argsx, **kwargs) * loop.ds)
field[1, ri] = np.sum(bipole(*argsy, **kwargs) * loop.ds)
field[2, ri] = np.sum(bipole(*argsz, **kwargs) * loop.ds)
field *= str_factor
return field
# The End