Source code for comet.pyhed.loop.loop_bib

"""
Part of comet/pyhed/loop

This script contains the main class for the sources as well as several scripts
for the initialization of loop classes (build... ).
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os
import time

import numpy as np
import pygimli as pg

from pathlib import Path, PosixPath
from comet.pyhed.loop.loop_para import (loopCalculation, loopInterpolation,
                                        calcFieldMatrix_para)
from comet.pyhed import config as Config
from comet.pyhed import SecondaryConfig, log
from comet.pyhed.IO import getItem
from comet.pyhed.misc import (
    tetgen151, R3VtoNumpy, vec, createPolyBoxWithHalfspace,
    Timer, cleanUpTetgenFiles, setNearestMarkers,
    convertCoordinates, local_apps_bash, temporal_printoptions)


[docs]class Loop: """ Class for the computation of arbitrary shaped polygon loops. Some functions automatically return this loopclass as result. It is recommended to use these (you may take a look at the example) Parameters ---------- Input: string or raw loop class Filename of a prior saved loopfile (recommended). Alternatively the output of the function **computeLoopPositions** (not recommended). For the latter case plenty of convenience functions are found in the the *loop* submodule of *pyhed* starting with "build".... config: string or pyhed.config Defines the configuration file for the loop. Example ------- >>> # example: import >>> loopclass = Loop('path/to/loopfile') >>> # example: create circular loop >>> loopclass = buildCircle(10, 12) # 10 m radius, 12 dipoles """ def __init__(self, Input, config=None, verbose=False): self.loopmesh = None self.loop_mesh_name = None self.field = None # later Bfield of the loop self.field_on_nodes = None # field can either be calculated on Nodes # or cellmidpoints, None for undecided yet self.dipolemesh = None self.dipole_mesh_name = None self.d_field = None if isinstance(config, str): self.config = Config(config) # config name elif config is None: self.config = Config() # default config else: self.config = config # config class self.verbose = verbose self.setMeshParameters() self.name = None self.cfg2_savename = None self.secondary_config = None if isinstance(Input, str): # load loop file self.load(Input, config, verbose=self.verbose) else: # direct input try: self.type = Input[4] self.grounded = Input[3] self.pos = np.array(Input[0]) self.phi = np.array(Input[1]) self.ds = np.array(Input[2]) if self.type not in ('dipole', 'Dummy') and len(self.pos) > 1: self._handleNonUniqueDipoles() self._setDistance() except: # I know it's a bare exception, however I only raise # another exception after that, so no harm done raise Exception('do not call Loop-Class directly. Please use' ' the build-functions provided by the loop ' 'sub package of pyhed.') # field matrices if calcualtion with matrix=True self.field_matrix = None self.field_matrix_sin = None self.field_matrix_cos = None # index of the loop (for later usage inside the kernel class) self.idx = 0 # 2d self.secMOD = None self._para_mesh_2d = None self.has_boundary = False self.orig_cell_count_2d = None self.orig_cells = None self.valid_marker = None self.paramesh_sort = False # is parameter only filled with unique vals self.anom_idx = None # value indices if sorted=True self.para_limits_x = None self.para_limits_y = None self.para_limits_z = None self.preserved_edge_centers = None self.area = 0.0 # area of loop, if closed polygon self.turns = 1 # turns of loop, if closed polygon self.secondary = None # and at last a placeholder: this will turn into something else someday self.geometry = None def __str__(self): """ Readible representive of the class. """ from comet.pyhed.misc import temporal_printoptions with temporal_printoptions(threshold=5): string = '### {} ###\n{}\npos:\n{} {}\nphi:\n{} {}\nds:\n{} {}'\ .format(self.__class__.__name__.upper(), 'looptype: ' + str(self.type), self.pos, np.shape(self.pos), self.phi, np.shape(self.phi), self.ds, np.shape(self.ds)) return string def __repr__(self): """ Unambiguous representive of the class. used by other class __str__ """ if self.type.lower() == 'dummy': return 'dummy loop' mean = np.mean(self.pos, axis=0) if self.type.lower() == 'circle': end = ', radius: {:.1f} m'.format( np.linalg.norm(self.pos[0] - mean)) coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean) elif self.type.lower() == 'rectangle': range_ = np.max(self.pos, axis=0) - np.min(self.pos, axis=0) end = ', {:.1f} x {:.1f} m'.format(*range_[:2]) coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean) elif self.type.lower() == 'line': end = ', length: {:.1f} m'.format(np.sum(self.ds)) coord = 'between ({:.1f}, {:.1f}, {:.1f})'.format(*self.pos[0]) coord += ' and ({:.1f}, {:.1f}, {:.1f})'.format(*self.pos[-1]) else: end = '' coord = 'at ({:.1f}, {:.1f}, {:.1f})'.format(*mean) return ('{} {}, {} dipoles {}{}' .format('open' if self.grounded else 'closed', self.type, len(self.pos), coord, end)) @property def para_mesh_2d(self): return self.getParaMesh2D()
[docs] def setPrimaryConfig(self, config): """ Sets the primary config which handles the resistivity distribution as well as the frequency of the primary field. For setting the 1D model directly see *setModel*. Parameters ---------- config: path or comet.pyhed.config.Config instance Configuration class instance or file path. """ if isinstance(config, str): self.config.load(config) else: self.config = config
[docs] def setSecondaryConfig(self, secondary_config): """ Sets class attribute with secondary config or loads file. Parameters ---------- secondary_config: string or pyhed.SecondaryConfig Seondary config class instance or file path. """ # checking secondary configuration file if isinstance(secondary_config, str): self.loadSecondaryConfig(secondary_config) else: # assuming valid sec config? print('setting secondary config...') self.secondary_config = secondary_config
[docs] def setFrequency(self, frequency): """ Sets the frequency, not angular frequency for the field calculation. """ self.config.f = float(frequency)
[docs] def setFType(self, ftype): assert ftype.lower() in ('h', 'e', 'b'),\ 'field type has to be "H", "B" or "E".' self.config.ftype = ftype.upper()
[docs] def setModel(self, rho, d=None, thickness=True, resistivity=True): """ Sets the synthetic 1D layered earth model for dipole calculation. Parameters ---------- rho: float or array_like Resistivity/conductivity distribution for a layered earth. d: float or array_like or None [None] Thickness or layer depth. Empty (None, 0, or []) for halfspace. thickness: boolean [True] The parameter d is used as thickness (True, len(rho) - 1) or depth (False, len(rho)), respectively. resistivity: boolean [True] The parameter rho is used as Resistivity (True) or conductivity (False), respectively. """ d = np.atleast_1d(d) if d.size == 0 or d[0] == 0 or d[0] is None: d = np.array([]) rho = np.atleast_1d(rho) if thickness is False: d = np.diff(np.abs(d)) if resistivity is False: rho = 1./rho assert_message = 'found {} thicknesses for {} layer'\ .format(len(d), len(rho)) assert len(d) + 1 == len(rho), assert_message self.config.rho = rho self.config.d = d if self.secondary_config is not None: self.secondary_config.sigma_ground = \ (1./np.atleast_1d(self.config.rho)).tolist()
@property def model(self): model = np.append(self.config.rho, self.config.d) return model
[docs] def setMeshParameters(self, refinement_para=1., max_area_factor=1., tetgen_quality=1.2): """ Alters the Parameter responsible for the quality and size used during automatic mesh generation. Parameters ---------- refinement_para: float [1] An increase of refinement_para decreases the size of the smallest cell at the dipoles and therefore incrreases the total number of refinement cells around the dipole. Omitts refinement if value is negative. max_area_factor: positive float [1] The max_area_para lineary affects the maximum volume of a cell. An increase of the parameter allows for greater cells and therefore decreases the total number of cells outside of the refined section of the mesh. Set to 0.5 for a fine mesh and anywhere near 2 for a coarse mesh. Highly affects the total number of nodes/cells in the mesh. tetgen_quality: float [1.2] The tetgen_quality parameter is directly piped to the corresponding tetgen call in the meshgeneration process. Decrease this parameter (e.g. to 1.12) to increase the homogeneity of the triangles. Be careful with this one, tetgen very easy starts to split cells in smaller and smaller pieces and therefore increase the total cellcount to very high values (millions and more). """ self.refinement_para = 400. * refinement_para self.max_area_para = 0.004 * max_area_factor self.tetgen_quality = tetgen_quality
[docs] def setParaMesh2D(self, para_mesh_2d, limits=None, append_boundary=False, preserve_edges=False, anomaly=None, sort=True, **kwargs): """ Sets 2D parameter mesh for secondary field calculation. Parameters ---------- para_mesh_2d: string or pg.Mesh 2D parameter mesh or path to mesh. limits: [float, float] or None Minimum and maximum values for y of the area where 2D parameters are to be transferred to the 3D FEM mesh. Default are the x extension of the 2D parameter mesh. append_boundary: boolean [ False ] Fills in an additional boundary with prolongated resistivity values around the transferred 2D values. This is useful as it reduces artifacts at the edge of the 2D domain oin the FEM mesh. anomaly: None or np.ndarray [ None ] Optionally. Alternatively use *setAnomalies*. Anomaly vector (conductivity vector) with values for each cell in the 2D parameter domain. Attention: conductivity is used, not resistivity! sort: boolean [ False ] Optionally. Alternatively use *setAnomalies*. If True, set the same marker for double values in anomaly vector. This is for blocky 2d structures, where only a few different regions are required. Use default False if dealing with smooth inversion results, for example in a structural coupling. kwargs to *appendTriangleBoundary* Calls *setAnomalies* of anomaly is given. Furthermore calls *updateFEMAnomaly* if FEMMesh has been set already. """ log.info('setParaMesh2D: setting 2D paramter mesh: {}' .format(para_mesh_2d)) # string or copy: always copy of mesh, never a reference self._para_mesh_2d = pg.Mesh(para_mesh_2d) for edge in self._para_mesh_2d.boundaries(): if np.isclose(edge.center()[1], 0): edge.setMarker(-1) self.orig_cell_count_2d = self._para_mesh_2d.cellCount() log.debug('Original paramesh cell count (no boundary): {}' .format(self.orig_cell_count_2d)) if limits is not None: self.para_limits_y = limits else: val_min = np.min(self.pos[:, 1]) val_max = np.max(self.pos[:, 1]) val_thk = val_max - val_min self.para_limits_y = [val_min - 3 * val_thk, # leave at 3 * !! val_max + 3 * val_thk] # leave at 3 * !! log.info('Paramesh y limits: {}'.format(self.para_limits_y)) self.para_limits_x = [self.para_mesh_2d.xmin(), self.para_mesh_2d.xmax()] self.para_limits_z = [self.para_mesh_2d.ymin(), self.para_mesh_2d.ymax()] self.preserved_edge_centers = None log.debug('preserve edges = {}'.format(preserve_edges)) if preserve_edges: # UNTESTED preserve edges! topreserve = self.para_mesh_2d.boundaryMarkers() > 0 if np.sum(topreserve) >= 0: # found marker > 0 edge_ids = np.where(self.para_mesh_2d.boundaryMarkers() > 0)[0] self.preserved_edge_centers = [] for edge_id in edge_ids: # store edge center in order to find edge again later self.preserved_edge_centers.append( np.array(self.para_mesh_2d.boundaryCenters()[edge_id])) # append triangle boundary self.has_boundary = False # pg.show(self._para_mesh_2d) log.debug('append boundary = {}'.format(append_boundary)) if append_boundary: # add new boundary with special cell markers to identify orig cells kwargs['marker'] = -42 # self._para_mesh_2d_with_bounds self._para_mesh_2d = pg.meshtools.appendTriangleBoundary( self._para_mesh_2d, **kwargs) # pg.show(self._para_mesh_2d) cm = np.array(self._para_mesh_2d.cellMarkers()) self.orig_cells = cm != -42 self.has_boundary = True para_limits_x = [self.para_mesh_2d.xmin(), self.para_mesh_2d.xmax()] log.debug('append boundary: increase x limits: {} -> {}' .format(self.para_limits_x, para_limits_x)) para_limits_z = [self.para_mesh_2d.ymin(), self.para_mesh_2d.ymax()] log.debug('append boundary: increase z limits: {} -> {}' .format(self.para_limits_z, para_limits_z)) log.debug('new paramesh cell count (with boundary): {}' .format(self.para_mesh_2d.cellCount())) # have secondary config if self.secondary_config is None: self.createDefaultSecondaryConfig() if anomaly is not None: # case 1/4: anomaly given, set paramesh attributes and marker self.setAnomalies(anomaly=anomaly, sort=sort) # sets self.secondary_config.sigma_paramesh if self.loopmesh is not None: # case 2/4: also have FE mesh, set FE attributes (anomalies) # and marker self.updateFEMAnomaly() else: # case 3/4: already got paramesh anomalies and got FE mesh, # also set FE mesh marker and anomlies if self.loopmesh is not None and\ self.secondary_config.sigma_paramesh != []: self.updateFEMAnomaly()
# case 4/4: got neither anomalies nor nor FE mesh: call it a day
[docs] def setDipoleMesh(self, mesh, savename='_default_dipole_mesh', verbose=True): """ Sets the dipolemesh and saves it under savename. Parameters ---------- mesh: string or mesh instance Pygimli mesh instance or file path to pygimli mesh. savename: string [ None ] Used savename for mesh, if mesh is already a mesh instance. verbose: boolean [ False ] Turn on verbose mode. """ if isinstance(mesh, pg.Mesh): self.dipolemesh = mesh self.dipole_mesh_name = savename self.dipolemesh.save(self.dipole_mesh_name) elif isinstance(mesh, str): if not mesh.endswith('.bms'): if not os.path.exists(mesh + '.bms'): mesh += '_dipole_mesh' if verbose: print('load dipole mesh "{}": '.format(mesh), end='', flush=True) self.dipolemesh = pg.Mesh(mesh) if verbose: print(self.dipolemesh) self.dipole_mesh_name = mesh
[docs] def setLoopMesh(self, mesh, savename=None): """ Sets the loopmesh. Parameters ---------- mesh: string or mesh instance Pygimli mesh instance or file path to pygimli mesh. savename: string [ None ] Used savename for mesh, if mesh is already a mesh instance. Alternatively a default name is generated with *getDefaultLoopMeshBaseName*. """ if isinstance(mesh, pg.Mesh): self.loopmesh = mesh elif isinstance(mesh, str): log.info('load loop mesh "{}"... '.format(mesh)) self.loopmesh = pg.Mesh(mesh) savename = mesh elif isinstance(mesh, PosixPath): log.info('load loop mesh "{}"... '.format(mesh.as_posix())) self.loopmesh = pg.Mesh(mesh.as_posix()) savename = mesh.as_posix() else: # gets coordinate Points mesh = np.array(mesh, dtype=np.float64) self.loopmesh = pg.Mesh(3) shape_in = np.shape(mesh) if shape_in[0] == 3 and shape_in[1] != 3: mesh = mesh.T for pt in mesh: self.loopmesh.createNode(pg.Pos(pt[0], pt[1], pt[2])) self.setLoopMeshName(savename=savename) # reset field if new mesh # new mesh = other cell and node count (its not a perfect comparison # but it should do 99.9% of the time) # A thorough test is not possible. if self.field is not None: if self.field.shape[1] != self.loopmesh.nodeCount() and \ self.field.shape[1] != self.loopmesh.cellCount(): self.field = None # Known issues: switching from .h5 mesh (custem) to .bms mesh (pygimli) # This test does not detect the difference, however the meshs will have # different sortings, which has to be corrected for the fields. # This, for example, is done in prepareSecondaryFieldCalculation log.debug('set loopmesh: {}'.format(self.loopmesh))
[docs] def setParaMeshMarkerAndVals(self, anomaly=None, sort=True): """ Handle anomaly vector and marker of the 2d parameter mesh. Parameters ---------- anomaly: array_like [ None ] Vector with conductivities in S/m. Expect one entry for each cell in parameter mesh. If not given, and sort is True an error is raised. sort: boolean [ False ] If True, set the same marker for double values in anomaly vector. This is for blocky 2d structures, where only a few different regions are required. Use default False if dealing with smooth inversion results, for example in a structural coupling. """ raise DeprecationWarning('Use setParaMesh2D(..., anom=...) instead') if anomaly is None: if sort is True: raise Exception('Need anomaly vector for sort option.') else: self.paramesh_sort = False anomaly = self._checkAnomaly(anomaly) self._checkParaMesh() # reserve marker for air and background offset = len(self.secondary_config.sigma_ground) + 1 if sort: self.paramesh_sort = True # set only marker for unique resistivity values # This way marker and resistivities are still matching! unique1, anom_idx, marks = np.unique( anomaly.round(decimals=4), return_inverse=True, return_index=True) self.anom_idx = anom_idx log.debug(f'unique old: {unique1}') unique = anomaly[self.anom_idx] log.debug(f'unique: {unique}') self.secondary_config.setAnomalies(unique) log.info('sort: ({} -> {} marker)'.format( self.para_mesh_2d.cellCount(), len(unique))) log.info('sec config anomaly: {} ({})'.format( unique, len(unique))) else: self.anom_idx = None # set one marker per entry in anomaly, no matter the value marks = np.arange(self.para_mesh_2d.cellCount()) if anomaly is not None: # for calculation self.secondary_config.setAnomalies(anomaly) log.info('sec config anomaly: {} ({})'.format( anomaly, len(anomaly))) # set marker for transfer to FEM Mesh and attributes for debug self.para_mesh_2d.setCellMarkers(marks + offset) log.debug('paramesh2d marker count after setParaMeshMarkerAndVals: {}' .format(len(np.unique(self.para_mesh_2d.cellMarkers()))))
[docs] def setAnomalies(self, anomaly, sort=True): """ Handle anomaly vector and marker of the 2d parameter mesh. Parameters ---------- anomaly: array_like [ None ] Vector with conductivities in S/m. Expect one entry for each cell in parameter mesh. sort: boolean [ False ] If True, set the same marker for double values in anomaly vector. This is for blocky 2d structures, where only a few different regions are required. Use default False if dealing with smooth inversion results, for example in a structural coupling. """ # import check self._checkParaMesh() # store optional input parameter self.paramesh_sort = sort # some more checks if anomaly is None: raise Exception('Need anomaly vector with length of paramesh ' 'cell count.') if len(anomaly) != self.orig_cell_count_2d: raise Exception('Dimension mismatch between original paramesh ' '({} cells) and anomaly vector ({} values).' .format(self.orig_cell_count_2d, len(anomaly))) # store raw values in sec config self.secondary_config.sigma_raw = np.array(anomaly).tolist() # prepare anomaly vector with respect to parameter mesh prolongation if self.has_boundary is False: anom = np.array(self.secondary_config.sigma_raw) else: anom = np.zeros(self.para_mesh_2d.cellCount()) anom[self.orig_cells] = anomaly anom = pg.Vector(anom) # inplace prolongation needs reference # therefore explicit pg.Vector call is needed before giving anom # to prolongateEmptyCellsValues self.para_mesh_2d.prolongateEmptyCellsValues(anom) log.debug('setAnomalies: detect boundary around paramesh: ' 'prolongate empty cells values.') with temporal_printoptions(): log.debug('setAnomalies: prolonged anomaly: {} ({})' .format(np.array(anom), len(anom))) # set cell attributes, this will help some cache stuff in the future self.para_mesh_2d.setCellAttributes(anom) # pygimli to numpy anom = np.array(anom) # reserve marker for air and background offset = len(self.secondary_config.sigma_ground) + 1 # set one marker per entry in anomaly as all values are considered # unique, regardless of the actual value # Note: need continuous array with 0 being air, 1...n for n layers # of the background field and n+1...n+1+m for m anomalies. # theres no air here, however the marker is still reserved as are # the markers from 1...n hence *offset* has been introduced. if self.paramesh_sort: # sorted cells: anomaly vector has unique values # self.anom_idx -> from 2d mesh with bounds to unique vals # This way marker and resistivities are still matching! unique1, anom_idx, marks = np.unique( anom.round(decimals=4), return_inverse=True, return_index=True) self.anom_idx = anom_idx log.debug(f'unique old: {unique1}') unique = anom[self.anom_idx] log.debug(f'unique: {unique}') log.info('sort: ({} -> {} marker)'.format( self.para_mesh_2d.cellCount(), len(unique))) # store in secondary config self.secondary_config.sigma_paramesh = unique.tolist() else: # just take all values as they are self.anom_idx = None marks = np.arange(self.para_mesh_2d.cellCount()) # store in secondary config self.secondary_config.sigma_paramesh = anom.tolist() with temporal_printoptions(): log.info('setAnomalies: sec config anomaly (paramesh): {} ({})' .format( np.array(self.secondary_config.sigma_paramesh), len(self.secondary_config.sigma_paramesh))) # set marker for transfer to FEM Mesh and attributes for debug self.para_mesh_2d.setCellMarkers(marks + offset) log.debug('paramesh2d marker count after setAnomalies: {}' .format(len(np.unique(self.para_mesh_2d.cellMarkers()))))
# OPTIONALLY # [x] set anomalies in secondary config, raw + paramesh # add marker to FE mesh and sigma_anom
[docs] def setFEMMesh_old(self, mesh, valid_marker=None, savename=None): """ Sets the FEM mesh as loopmesh and handles the domain markers. Parameters ---------- mesh: string or mesh instance Pygimli mesh instance or file path to pygimli mesh. valid_marker: array_like [ None ] If None, checks which domains of the 2D mesh are actually transferred to the 3D FEM mesh. The markers are saved in the *valid_marker* attribute. If given, sets vector directly after some checks. savename: string [ None ] Useful if multiple loops are using the same mesh (saves diskspace). Ignored if *mesh* is a string already. """ self._checkParaMesh() if isinstance(mesh, str): self.setLoopMesh(savename) else: self.setLoopMesh(mesh, savename=savename) self.setFEMMarker(valid_marker=valid_marker)
[docs] def setFEMMesh(self, mesh, valid_marker=None, savename=None): """ Sets the FEM mesh as loopmesh and handles the domain markers. Parameters ---------- mesh: string or mesh instance Pygimli mesh instance or file path to pygimli mesh. valid_marker: array_like [ None ] If None, checks which domains of the 2D mesh are actually transferred to the 3D FEM mesh. The markers are saved in the *valid_marker* attribute. If given, sets vector directly after some checks. savename: string [ None ] Useful if multiple loops are using the same mesh (saves diskspace). Ignored if *mesh* is a string already. Calls *_setFEMMarker* is paramesh has been set. Furthermore calls *updateFEMAnomaly* if anomaly has been set through either *setParamesh2D* or *setAnomaly* Produces error message if valid_marker array is given, but no paramesh is found """ if isinstance(mesh, str): self.setLoopMesh(savename) else: self.setLoopMesh(mesh, savename=savename) if self._checkParaMesh(raise_it=False): # if paramesh self._setFEMMarker(valid_marker=valid_marker) if self.secondary_config.sigma_paramesh != []: # if paramesh + if anomaly set before self.updateFEMAnomaly(set_marker=False) else: # if neither paramesh nor anomaly if valid_marker is not None: log.error('setFEMMesh: got valid_marker array, however ' 'cannot deal with it if no paramesh is set before. ' 'Skip settin of FEMesh markers.')
def _setFEMMarker(self, valid_marker=None): """ Sets and checks the domain marker of the 3D FEM mesh. Parameters ---------- valid_marker: array_like [ None ] If None, checks which domains of the 2D mesh are actually transferred to the 3D FEM mesh. The markers are saved in the *valid_marker* attribute. If given, sets vector directly after some checks. """ self._checkParaMesh() old_marker = np.array(self.loopmesh.cellMarkers()) # domains in para mesh num_para = len(np.unique(self.para_mesh_2d.cellMarkers())) # domains in fem mesh - air - ground background num_ground = len(self.secondary_config.sigma_ground) num_marker = len(np.unique(self.loopmesh.cellMarkers()) ) - 1 - num_ground if valid_marker is not None: # which of the para domains are mapped to the FEM mesh if len(valid_marker) != num_para: raise Exception( 'Mismatch between para mesh domain count ({}) and len' ' of given valid marker array ({}).' .format(num_para, len(valid_marker))) num_marker2 = len(np.where(valid_marker)[0]) if num_marker2 != num_marker: raise Exception('Mismatch between number of valid domains in' ' given marker vector ({}) and number of ' 'anomaly regions found in fem mesh ({}).' .format(num_marker2, num_marker)) else: # at this point we varified that given marker vector is valid. self.valid_marker = valid_marker else: # set marker for inner area and return not used marker not_set = setNearestMarkers(self.loopmesh, self.para_mesh_2d, self.para_limits_y, fill_air_ground=True) pmcc = len(self.para_mesh_2d.cellMarkers()) log.info('setFEMMarker(): Tranferred {}/{} parameter mesh ' 'markers to FE mesh.' .format(pmcc - len(not_set), pmcc)) if len(not_set) / pmcc >= 0.25 and pmcc > 100: log.warning( 'More than 25 % of the values in the parameter mesh will ' 'not be transferred to the FE mesh. This either means ' 'your parameter mesh is very dense, or your FE mesh is ' 'too coarse.') if len(not_set) != 0: # searching for not used markers and performing index shift # reducing sigma_anom accordingly # index shift: -air, ground # VERY SLOW!!!! excl_ids = np.array(not_set) - 1 - num_ground valid = np.ones(num_para, dtype=bool) valid[excl_ids] = False self.valid_marker = valid new_marker = np.array(self.loopmesh.cellMarkers()) for ind in np.sort(not_set)[::-1]: new_marker[new_marker > ind] -= 1 self.loopmesh.setCellMarkers(new_marker) else: self.valid_marker = np.ones(num_para, dtype=bool) # Quick check to due to errors in the past. This stays! import time timer_start = time.time() un = np.unique(self.loopmesh.cellMarkers()) for mn in range(np.max(self.loopmesh.cellMarkers()) + 1): if mn not in un: raise Exception('Did not found cell with marker {}' .format(mn)) log.debug('_setMarker(): error test: {:.4f}' .format(time.time() - timer_start)) if not np.allclose(old_marker, np.array(self.loopmesh.cellMarkers())): h5path = Path(self.loop_mesh_name).with_suffix('.h5').as_posix() log.info('_setFEMMarker(): Marker changed, Export .h5 ({})' .format(h5path)) self.exportFenicsHDF5Mesh(h5path)
[docs] def setFEMMarker_old(self, valid_marker=None): """ Sets and checks the domain marker of the 3D FEM mesh. Parameters ---------- valid_marker: array_like [ None ] If None, checks which domains of the 2D mesh are actually transferred to the 3D FEM mesh. The markers are saved in the *valid_marker* attribute. If given, sets vector directly after some checks. """ log.error('Old function has been used. Abort.') raise Exception('setFEMMarker_old should not be in use anymore.') self._checkParaMesh() # domains in para mesh num_para = len(np.unique(self.para_mesh_2d.cellMarkers())) # domains in fem mesh - air - ground background num_ground = len(self.secondary_config.sigma_ground) num_marker = len(np.unique(self.loopmesh.cellMarkers()) ) - 1 - num_ground if valid_marker is not None: # which of the para domains are mapped to the FEM mesh if len(valid_marker) != num_para: raise Exception( 'Mismatch between para mesh domain count ({}) and len' ' of given valid marker array ({}).' .format(num_para, len(valid_marker))) num_marker2 = len(np.where(valid_marker)[0]) if num_marker2 != num_marker: raise Exception('Mismatch between number of valid domains in' ' given marker vector ({}) and number of ' 'anomaly regions found in fem mesh ({}).' .format(num_marker2, num_marker)) else: # at this point we varified that given marker vector is valid. self.valid_marker = valid_marker else: # set marker for inner area and return not used marker not_set = setNearestMarkers(self.loopmesh, self.para_mesh_2d, self.para_limits_y, fill_air_ground=True) pmcc = len(self.para_mesh_2d.cellMarkers()) log.info('setFEMMarker(): Tranferred {}/{} parameter mesh ' 'markers to FE mesh.' .format(pmcc - len(not_set), pmcc)) if pmcc - len(not_set) / pmcc >= 0.25 and pmcc > 100: log.warning( 'More than 25 % of the values in the parameter mesh will ' 'not be transferred to the FE mesh. This either means ' 'your parameter mesh is very dense, or your FE mesh is ' 'too coarse.') if len(not_set) != 0: # searching for not used markers and performing index shift # reducing sigma_anom accordingly # index shift: -air, ground excl_ids = np.array(not_set) - 1 - num_ground valid = np.ones(num_para, dtype=bool) valid[excl_ids] = False self.valid_marker = valid new_marker = np.array(self.loopmesh.cellMarkers()) for ind in np.sort(not_set)[::-1]: new_marker[new_marker > ind] -= 1 self.loopmesh.setCellMarkers(new_marker) else: self.valid_marker = np.ones(num_para, dtype=bool) un = np.unique(self.loopmesh.cellMarkers()) # Quick check to due to errors in the past. This stays! for mn in range(np.max(self.loopmesh.cellMarkers()) + 1): if mn not in un: raise Exception('Did not found cell with marker {}' .format(mn))
[docs] def calculate(self, num_cpu=12, loop_mesh=None, dipole_mesh=None, interpolate=False, savename=None, cell_center=False, verbose=False, mode='auto', matrix=False, field_matrix=None, max_node_count=None, **kwargs): """ Computation of the loop field with respect to the config. Parameters ---------- num_cpu: integer [ 12 ] Maximum number of processes allowed for this task. loop_mesh: string or mesh instance [ None ] Optional. Possibility to give a user defined mesh for the calculation. dipole_mesh: string or mesh instance [ None ] Optional. Possibility to give a user defined mesh for the calculation (interpolate=True or matrix=True only). interpolate: boolean [ True ] The loop dipoles can either be calculated directly (False) or once on a seperated mesh (dipolemesh) and then interpolated to the loopmesh (True). If a dipoleFieldName is given, this field will be used for the interpolation. savename: string [ None ] Optional. If savename is not None, the loop will be saved under the name defined in savename. cell_center: boolean [ True ] A default the field of the loop will be calculated at the cell center of the mesh cells. This flag allows for calculation at the mesh nodes. Affects only the definition of the final loopmesh, the dipolemesh will always be calculated at the nodes for interpolation reasons. verbose: boolean [ False ] Turn on verbose mode. mode: string [ 'auto' ] Five posibilities: 'auto', 'config', 'te', 'tm', 'tetm'\n 'auto': Automatic detection wether the loop is grounded or not. Grounded wires are calculated with te and tm mode (see HED). Non grounded wires are calculated with te mode only (sufficient).\n 'config': the default config decides the mode the field is calculated in.\n 'te', 'tm', 'tetm': Calculates the field in the choosen mode. matrix: boolean [ False ] Alternatively calculation approach. At first the field on a highly dense dipole mesh will be triggered. After that the field will be interpolated to the single dipole positions by the means of a matrix vector multiplication with a matrix containing appropriate weighting factors. This Approach takes longer than direct calculation in the first run, but the calculated matrix can be used for further calculations with different frequencies or resistivity models (as long as the loopmesh and dipolemesh remain the same). field_matrix: list or string [ None ] Interpolation matrices or file path if calculation with matrix=True. Will be calculated automatically if None. max_node_count: integer [ None ] As all points will be calculated at once, the computational effort scales lineary with the reciever count, the transmitter count and the used hankel factors. If the limits of the available memory is reached *max_node_count* can be used to define the maximum chunk of nodes to be computated at once. Other nodes will be computed afterwards. Keyword arguments ----------------- Keyword arguments are redirected to loop.save and to define the *drop_tol* (float [ 1e-2 ]) in the cylindrical coordinate transformation to avoid instabilities around the source. """ tick = Timer() # load/import dipole Mesh if dipole_mesh is not None: self.setDipoleMesh(dipole_mesh) # load/import loop Mesh if loop_mesh is not None: self.setLoopMesh(loop_mesh) # create loop mesh if still missing if self.loopmesh is None: log.info('No Loop Mesh given or found. Creating.') self.createLoopMesh(savename=savename) log.log(16, 'loopmesh: {!s}'.format(self.loopmesh)) # transform coordinates, and remember to change them back again # only in case where loopmesh is manually set numpy array (custEM) transposed = False if not isinstance(self.loopmesh, pg.Mesh): if len(self.loopmesh) != 3 and len(self.loopmesh[0]) == 3: self.loopmesh = self.loopmesh.T transposed = True # one dipole only case if self.type == 'dipole': interpolate = False # just one dipole: always calculate directly # decide wether tm, te or both if mode == 'auto': if self.grounded is False: log.info('detecting non-grounded loop, set mode to "te"') self.config.mode = 'te' elif self.grounded is True: log.info('detecting grounded loop, set mode to "tetm"') self.config.mode = 'tetm' elif mode == 'config': pass else: self.config.mode = mode # calculation on nodes or cell midpoints self.field_on_nodes = not cell_center # historic log.log(16, 'cell center: {}'.format(cell_center)) if matrix is True or interpolate is True: if self.dipolemesh is None: log.info('No Dipole Mesh given or found. Creating.') self.createDipoleMesh(savename=savename) log.log(16, 'dipolemesh: {}'.format(self.dipolemesh)) # dipole calcualation log.info(f'Calculating dipole field of {self!r}') log.info(f'{self.config!r}') log.debug(f'{self.config!s}') self.calculateDipoleField( num_cpu=num_cpu, max_node_count=max_node_count, verbose=verbose, drop_tol=kwargs.get('drop_tol', 1e-2)) if matrix is False: # case 1/3: interpolate # dipolemesh created out of loopmesh if interpolate is True: # interpolation self.field = loopInterpolation(self.d_field, self.dipole_mesh_name, self.loopmesh, (self.pos, self.phi, self.ds), verbose=verbose, cell_center=cell_center, num_cpu=num_cpu) # case 2/3: calculate directly elif interpolate is False: log.info(f'Calculating loop field of {self!r}') log.info(f'{self.config!r}') log.debug(f'{self.config!s}') self.field = loopCalculation(self.loopmesh, (self.pos, self.phi, self.ds), self.config.rho, self.config.d, self.config.f, self.config.current, self.config.mode, self.config.ftype, verbose=verbose, cell_center=cell_center, num_cpu=num_cpu, max_node_count=max_node_count, **kwargs) # drop_tol if transposed is True: self.loopmesh = self.loopmesh.T self.field = self.field.T else: # case 3/3: calculation via matrix if self.field_matrix is None: if field_matrix is None: self.calculateFieldMatrix(num_cpu=num_cpu, verbose=verbose) elif isinstance(field_matrix, (tuple, list)): self.field_matrix = field_matrix[0] self.field_matrix_sin = field_matrix[1] self.field_matrix_cos = field_matrix[2] else: self.loadFieldMatrix(field_matrix) self.calculateFieldFromMatrix(verbose=verbose) log.info('total field calculation time: {:2.2f} seconds' .format(tick.time_total)) if savename is not None: self.save(savename, kwargs.pop('config_savename', None)) return self.field
[docs] def calculateInterpolationMatrix(self, Pos): """ Calculates the interpolation matrix. If one wished the field can be interpolated to another mesh. The interpolation matrix from the loopmesh to an arbitrary set of coordinates is calculated with this function. This function is called to initialize and append the weights to the interpolation matrix. Note: The loop class does not hold a reference of the resulting matrix, instead gives it back to the caller. Parameters ---------- Pos: np.ndarray or pg.core.PosVector Transmitter positions of shape (n, 3) with n positions. Values are expected to be floats (the conversion to a pg.PosVector will not check again). Returns ------- mat: pg.core.SparseMapMatrix Sparse interpolation matrix with number of columns equal to the number of nodes in the loopmesh and number of rows equal to the number of input positions. """ tick_sIM = time.time() log.log(16, 'Loop: Assembling InterpolationMatrix.') if not isinstance(Pos, pg.core.PosVector): Pos = pg.core.PosVector(Pos) interpolationMatrix = self.loopmesh.interpolationMatrix(Pos) log.log(13, '%2.2f sec' % (time.time() - tick_sIM)) return interpolationMatrix
# def calculateInterpolationMatrix_para(self, Pos, num_cpu=12): # """ Parallel version of *calculateInterpolationMatrix*. # # Not working yet. # """ # raise Exception('not working yet!') # tick_sIM = time.time() # if verbose: # print('Assembling InterpolationMatrix...', end='', flush=True) # if not isinstance(Pos, pg.core.PosVector): # Pos = pg.core.PosVector(Pos) # if verbose: # print(' {} to {} points'.format(self.loopmesh.nodeCount(), # len(Pos)), end='', flush=True) # self.loopmesh.save('mesh_for_interpolation.bms') # interpolationMatrix = InterpolationMatrix_para( # 'mesh_for_interpolation.bms', # Pos, # num_cpu=num_cpu, # in_node_count=self.loopmesh.nodeCount(), # verbose=True) # if verbose: # print(' %2.2f sec' % # (time.time() - tick_sIM), flush=True) # return interpolationMatrix
[docs] def calculateFieldMatrix(self, num_cpu=8, verbose=False): """ If wished the calcualtion of the total loop field can be done by interpolation and superposition of one highly accurate dipole field to the different transmitter positions of the loop. This is done either done directly or via a vector matrix multiplication. This function is called to initialize and append the weights to the interpolation matrix from the *dipolemesh* to the *loopmesh* for all tx positions with respect to *pos*, *phi*, and *ds*. This function will be called if *calculate* is called with *matrix=True*. Parameters ---------- num_cpu: integer [ 8 ] Define the maximum number of cores allowed for this operation. verbose: boolean [ False ] Turn on verbose mode. """ if num_cpu == 1: outPos = R3VtoNumpy(self.loopmesh.positions()) self.field_matrix = pg.core.SparseMapMatrix() self.field_matrix.resize(self.loopmesh.nodeCount(), self.dipolemesh.nodeCount()) self.field_matrix_cos = pg.core.SparseMapMatrix() self.field_matrix_cos.resize(self.loopmesh.nodeCount(), self.dipolemesh.nodeCount()) self.field_matrix_sin = pg.core.SparseMapMatrix() self.field_matrix_sin.resize(self.loopmesh.nodeCount(), self.dipolemesh.nodeCount()) for i, p in enumerate(self.pos): actualPos = vec.translate(outPos, -p[0], -p[1], z=-p[2], copy=True) vec.rotate3(actualPos, -self.phi[i]) new = self.dipolemesh.interpolationMatrix( pg.core.PosVector(actualPos)) * self.ds[i] self.field_matrix += new self.field_matrix_sin += new * np.sin(-self.phi[i]) self.field_matrix_cos += new * np.cos(-self.phi[i]) else: self.field_matrix,\ self.field_matrix_sin,\ self.field_matrix_cos = \ calcFieldMatrix_para(self.dipole_mesh_name, self.dipolemesh.nodeCount(), self.loopmesh, (self.pos, self.phi, self.ds), verbose=verbose, num_cpu=num_cpu)
[docs] def calculateFieldFromMatrix(self): """ Calculates the primary field on basis of the interpolation matrix and the dipole field. """ if self.field_matrix is None: self.calculateFieldMatrix() self.field = vec.interpolateField_rotatedMatrix( self.d_field, base_mat=self.field_matrix, sin_mat=self.field_matrix_sin, cos_mat=self.field_matrix_cos) return self.field
[docs] def calculateDipoleField(self, verbose=False, drop_tol=1e-2, num_cpu=12, max_node_count=None): """ Calculates field on dipole mesh. Parameters ---------- verbose: boolean [ False ] Turn on verbose mode. drop_tol: float [ 1e-2 ] Singularity fix. All horizontal distances between *drop_tol* and the transmitter dipole are placed between the first reciever outside the tolerance and the tolerance, maintaining the correct order and angle. num_cpu: integer [ 12 ] Maximum number of processes allowed for this task. max_node_count: integer [ None ] As all points will be calculated at once, the computational effort scales lineary with the reciever count, the transmitter count and the used hankel factors. If the limits of the available memory is reached *max_node_count* can be used to define the maximum chunk of nodes to be computated at once. Other nodes will be computed afterwards. """ self.d_field = loopCalculation(self.dipolemesh, ([(0, 0, 0)], [0], [1]), # x-HED self.config.rho, self.config.d, self.config.f, self.config.current, self.config.mode, self.config.ftype, verbose=verbose, cell_center=False, num_cpu=num_cpu, max_node_count=max_node_count, drop_tol=drop_tol) # drop_tol
[docs] def calcAndExportFieldsForFenics(self, export_vtk=False, num_cpu=32, **kwargs): """ Calculates and export primary fields for fenics secondary field calculation. Parameters ---------- kwargs: dict Keyword parameters are redirected to *calculate*. """ import dolfin as df if self.secMOD is None: log.info('initializing custEM instance') self.initCustEM(init_primary_field_class=True) orig_ftype = '{}'.format(self.config.ftype) # copy string is tricky magnetic = True electric = False if self.secondary_config.pf_EH_flag == 'E' or\ kwargs.get('debug', False): electric = True log.debug('initialize mesh (vector function space) from custEM') self.secMOD.FE.FS.pf_p = 1 V_cg = df.VectorFunctionSpace(self.secMOD.FE.FS.mesh, 'CG', 1) df_c = V_cg.tabulate_dof_coordinates().reshape( -1, 3)[0::3, :] pg_c = self.loopmesh.positions().array() pg_df, df_pg = convertCoordinates(pg_c, df_c) log.debug('primary calculation: coords: {}'.format( np.allclose(df_c, pg_c))) log.debug('primary calculation: check sorting: {}, {}'.format( np.allclose(df_c[df_pg], pg_c), np.allclose(df_c, pg_c[pg_df]))) if not np.allclose(df_c[df_pg], pg_c) or\ not np.allclose(df_c, pg_c[pg_df]): raise Exception('Something went wrong with the coordinate ' 'transformation between dolfin and pygimli mesh. ' 'please check input files.') self.setModel(self.secondary_config.sigma_ground, d=None, resistivity=False) if magnetic: self.config.ftype = 'H' log.info('calculate magnetic primary fields') self.calculate(num_cpu=num_cpu, **kwargs) h_field = np.copy(self.field) self.exportVTK('hfield_debug.vtk') else: h_field = np.zeros_like(self.loopmesh.positions().array(), dtype=np.complex) self.secMOD.FE.FS.PF.pvd_flag = export_vtk h_field_fenics = h_field[:, pg_df].T assert np.shape(h_field_fenics)[1] == 3 # setting H fields manually in preparation for export self.secMOD.FE.FS.PF.H_0 = [h_field_fenics] # calc E primary if electric: self.config.ftype = 'E' log.info('calculate electric primary fields') self.calculate(num_cpu=num_cpu, **kwargs) e_field = np.copy(self.field) self.exportVTK('efield_debug.vtk') else: e_field = np.zeros_like(self.loopmesh.positions().array(), dtype=np.complex) # setting E fields manually in preparation for export e_field_fenics = e_field[:, pg_df].T assert np.shape(h_field_fenics)[1] == 3 self.secMOD.FE.FS.PF.E_0 = [e_field_fenics] # only one export self.secMOD.FE.FS.PF.n_tx = 1 # fixed to one transmitter right now self.secMOD.FE.FS.PF.export_primary_fields() # reset field type to whatever we were interested in to begin with self.config.ftype = orig_ftype if self.config.ftype == 'E': self.field = e_field elif self.config.ftype == 'H': self.field = h_field elif self.config.ftype == 'B': self.field = h_field * 4e-7 * np.pi else: self.field = None
[docs] def calculateSecField(self, num_cpu=8, **kwargs): """ Calculates the secondary field using custEM. Calculates primary field as well if not found.\n Needs a FEM suited mesh as well as a parameter distribution provided by other functions of this class (See *createFEMMesh* and *prepareSecondaryFieldCalculation*). Parameters ---------- num_cpu: integer [ 8 ] Maximum number of processes allowed for this task. The actual calculation will be done in an mpirun environment with the selected number of cores. kwargs: dict Keyword arguments are redirected to *local_apps*. """ if self.secondary_config is None: raise Exception('No config for secondary field calculation has ' 'been loaded.') log.info('Start secondary field calculation routine.') if not self._checkPrimaryFields(): log.info('preparing primary fields...') self.calcAndExportFieldsForFenics(num_cpu=num_cpu) self.save(savename=self.name, save_mesh=False) success = local_apps_bash('secondaryFieldCalculation', self.name, str(num_cpu), **kwargs) if success != 0: raise Exception('Error in secondary field calculation.') else: log.info('load resulting fields: "{}"' .format('{}_total.npy'.format(self.name))) self.field = np.load('{}_total.npy'.format(self.name)) if '_s' in self.secondary_config.approach: self.secondary = np.load('{}_secondary.npy'.format(self.name)) else: self.secondary = None if self.config.ftype.lower() == 'b': # custEM produces H fields. self.field *= 4e-7 * np.pi if self.secondary is not None: self.secondary *= 4e-7 * np.pi
[docs] def load(self, savename=None, config=None, config2=None, verbose=True, load_meshes=True, overwrite_dir=False): """ Load Loop from files. Parameters ---------- savename: string [ None ] Basename of the lop class files. Other names are autogenerated using this basename. config: string [ None ] Tell the load function to explicitely load config from given path. Else the saved filepath in the main archive is used. config2: string [ None ] See *config*, but for secondary configuration. verbose: boolean [ True ] Turn on verbose mode. load_meshes: boolean [ True ] If originally saved, the meshes are loaded by default. However, this takes more time then the rest of the load function and can be ommitted if only the other parts are of interest. """ self.verbose = verbose if savename is not None: self.name = savename if self.name is None: raise Exception('Need savename to load loop instance.') filename = os.path.split(savename)[1].split('.')[0] name = os.path.join(os.path.dirname(self.name), filename) log.info('load loop: {}'.format(os.path.abspath(name + '.npz'))) with np.load(name + '.npz', allow_pickle=True) as archive: # load geometry if 'ltype' in archive: # new npz style log.debug('load geometry from same npz.') self.type = str(archive['ltype']) self.grounded = bool(archive['grounded']) self.pos = archive['pos'] self.phi = archive['phi'] self.ds = archive['ds'] self._setDistance() else: # old npz style log.debug('load geometry from separate npz.') self._loadLoopGeometry(name + archive['save_geometry'].item()) # attempt to load mesh if archive['save_mesh'].item() is not None: self.loop_mesh_name = archive['save_mesh'].item() if load_meshes: if os.path.exists(self.loop_mesh_name): self.setLoopMesh(self.loop_mesh_name) else: # RuntimeError from c++: ignore, warn, and continue log.warning('Did not found loopmesh at "{}"' .format(self.loop_mesh_name)) # attempt to load dipole mesh if archive['save_dipole_mesh'].item() is not None: self.dipole_mesh_name = archive['save_dipole_mesh'].item() if load_meshes: if os.path.exists(self.dipole_mesh_name): self.setDipoleMesh(self.dipole_mesh_name, verbose=verbose) else: # RuntimeError from c++: ignore, warn, and continue log.warning('Did not found dipolemesh at "{}"' .format(self.loop_mesh_name)) self.field = getItem(archive, 'save_field') self.d_field = getItem(archive, 'save_d_field') self.idx = getItem(archive, 'save_idx') self.area = getItem(archive, 'save_area', 0.) self.turns = getItem(archive, 'save_turns', 1) # config if config is None: config = archive['save_config'].item() if config == '.cfg': config = name + config self.config.load(config) try: if config2 is None: config2 = archive['save_config2'].item() if config2 == '_sec.cfg': config2 = name + config2 if config2 is not None: # given or created, either way, continue with import # sets self.cfg2_savename # loads self.secondary_config if overwrite_dir: from pathlib import Path config2 = Path( self.name).parent.joinpath( Path(config2).name).as_posix() self.loadSecondaryConfig(config2) except (KeyError, TypeError): # old versions of loop classes doesnt have secondary configs pass if os.path.exists('{}_secondary.npy'.format(self.name)): log.log(16, 'load secondary field.') self.secondary = np.load('{}_secondary.npy'.format(self.name)) return True # success
[docs] def loadFieldMatrix(self, name, verbose=True): """ Loads the three matrices needed for recalculation of the primary field from numpy archive. See saevFieldmatrix for detailed description. Parameters ---------- name: string Path to file to be loaded. verbose: boolean [ True ] Turn on verbose mode. """ # from comet.pyhed.misc import loadSparseMatrixFromNumpyArchive log.info('Loading fieldMatrices from "{}"' .format(os.path.abspath(name))) # matrices = loadSparseMatrixFromNumpyArchive(name) # self.field_matrix = matrices[0] # self.field_matrix_sin = matrices[1] # self.field_matrix_cos = matrices[2] self.field_matrix = pg.core.SparseMapMatrix() self.field_matrix_sin = pg.core.SparseMapMatrix() self.field_matrix_cos = pg.core.SparseMapMatrix() self.field_matrix.load('{}_field_matrix.bmat'.format(name)) self.field_matrix_sin.load('{}_field_matrix_sin.bmat'.format(name)) self.field_matrix_cos.load('{}_field_matrix_cos.bmat'.format(name))
[docs] def loadSecondaryConfig(self, savename=None): """ Imports previously saved secondary config. Parameters ---------- savename: string [ None ] Used savename over *loop.sec_savename*. Throws Exception if both values are None. Replaces *loop.sec_savename*. """ if savename is not None: self.cfg2_savename = savename if self.cfg2_savename is None: raise Exception('Need savename to load secondary configuration.') self.secondary_config = SecondaryConfig(name=self.cfg2_savename)
[docs] def createLoopMesh(self, savename=None, exportVTK=False, airspace=False, verbose=False, xmax=None, xmin=None, ymax=None, ymin=None, zmin=None): """ Builds the mesh where the loop will be calculated in. Parameter --------- savename: string [ None ] Saves the created mesh under savename, as long as savenmae is not none. If no basename is given, the dafaultname will be '_default_LoopMesh' + looptype + number of dipoles + '.bms'. exportVTK: boolean [ False ] Switch to export the resulting mesh to a vtk with the given savename. airspace: boolean [ False ] Enables airspace. verbose: boolean [ False ] Turn on berbose mode. """ log.info('createLoopMesh()') if savename is None: loopbasename = self.getDefaultLoopMeshBaseName() else: loopbasename = savename self.loop_mesh_name = loopbasename + '.bms' # compute outer limits for loopmesh minx = np.min(self.pos.T[0]) - 3 * self._maxdistance maxx = np.max(self.pos.T[0]) + 3 * self._maxdistance miny = np.min(self.pos.T[1]) - 3 * self._maxdistance maxy = np.max(self.pos.T[1]) + 3 * self._maxdistance minz = np.min(self.pos.T[2]) - 3 * self._maxdistance if xmin is not None: minx = min(xmin, minx) if ymin is not None: miny = min(ymin, miny) if zmin is not None: minz = min(zmin, minz) if xmax is not None: maxx = max(xmax, maxx) if ymax is not None: maxy = max(ymax, maxy) if airspace: maxz = np.max(self.pos.T[2]) + 1 * self._maxdistance else: maxz = np.max(self.pos.T[2]) log.debug('X, Y, Z: ({:.2f}, {:.2f}), ({:.2f}, {:.2f}), ({:.2f}, ' '{:.2f})'.format(minx, maxx, miny, maxy, minz, maxz)) if len(self.pos) == 1: # just one dipole # 10 * 10 unit cube around the dipole minx = -10. + self.pos[0][0] maxx = 10. + self.pos[0][0] miny = -10. + self.pos[0][1] maxy = 10. + self.pos[0][1] minz = -10. + self.pos[0][2] if airspace: maxz = 10. + self.pos[0][2] else: maxz = self.pos[0][2] self.ds = [1.0] # create loopmesh (nodes, faces), then apply tetgen # corner points for the halfspace fringe loopmesh = pg.Mesh(3) ob8 = loopmesh.createNode(minx, miny, 0) ob9 = loopmesh.createNode(maxx, miny, 0) ob10 = loopmesh.createNode(maxx, maxy, 0) ob11 = loopmesh.createNode(minx, maxy, 0) # halfspace face loopmesh.createQuadrangleFace(ob8, ob9, ob10, ob11) # lower outer boundary nodes, earthspace ob0 = loopmesh.createNode(minx, miny, minz) ob1 = loopmesh.createNode(maxx, miny, minz) ob2 = loopmesh.createNode(maxx, maxy, minz) ob3 = loopmesh.createNode(minx, maxy, minz) # earth side loopmesh.createQuadrangleFace(ob8, ob9, ob1, ob0) loopmesh.createQuadrangleFace(ob9, ob10, ob2, ob1) loopmesh.createQuadrangleFace(ob10, ob11, ob3, ob2) loopmesh.createQuadrangleFace(ob11, ob8, ob0, ob3) # earth bottom loopmesh.createQuadrangleFace(ob0, ob1, ob2, ob3) if maxz > 0: # upper outer boundary nodes, airspace ob4 = loopmesh.createNode(minx, miny, maxz) ob5 = loopmesh.createNode(maxx, miny, maxz) ob6 = loopmesh.createNode(maxx, maxy, maxz) ob7 = loopmesh.createNode(minx, maxy, maxz) # air side loopmesh.createQuadrangleFace(ob8, ob9, ob5, ob4) loopmesh.createQuadrangleFace(ob9, ob10, ob6, ob5) loopmesh.createQuadrangleFace(ob10, ob11, ob7, ob6) loopmesh.createQuadrangleFace(ob11, ob8, ob4, ob7) # air top loopmesh.createQuadrangleFace(ob4, ob5, ob6, ob7) # append refinement points for i, posi in enumerate(self.pos): # log.debug([posi[0], posi[1], posi[2] - # self._mindistance / self.refinement_para * \ # len(self.pos)]) loopmesh.createNode( posi[0], posi[1], posi[2] - self._mindistance / self.refinement_para * len(self.pos)) tet_maxArea = self._maxdistance**3 * self.max_area_para log.debug('using global loopmesh volume constraint: {:.3f} m³, ' '({:.3f} m)'.format(tet_maxArea, np.cbrt(tet_maxArea))) pg.meshtools.exportPLC(loopmesh, loopbasename + '.poly') loopmesh.exportVTK(loopbasename + '_poly.vtk') # tetgen call tetgen151(loopbasename, maxArea=tet_maxArea, quality=self.tetgen_quality) try: self.loopmesh = pg.Mesh(loopbasename + '.1.vtk') except RuntimeError: # c++ for file not found in this case self.loopmesh = pg.meshtools.readTetgen(loopbasename + '.1') self.loopmesh.save(self.loop_mesh_name) # for kernel call (piping) log.info('loopmesh.save: {}'.format(self.loop_mesh_name)) cleanUpTetgenFiles(loopbasename) return self.loopmesh
[docs] def createDipoleMesh(self, quadratic=True, savename='_default_dipole_mesh.bms', save=False, verbose=False): """ Creates a suitable dipole mesh for calculation via a single dipole. Parameters ---------- quadratic: boolean [ True ] If chosen, uses a quadratic (2nd order) mesh for dipole calculation. savename: string [ '_default_dipole_mesh.bms' ] Define output name. save: boolean [ True ] Additional save of dipole mesh under *savename*. verbose: boolean [ False ] Turn on verbose mode. """ self.dipole_mesh_name = savename if self.dipole_mesh_name is None: self.dipole_mesh_name = '_default_dipole_mesh.bms' if self.loopmesh is None: self.createLoopMesh() if isinstance(self.loopmesh, pg.Mesh): lp = R3VtoNumpy(self.loopmesh.positions()).T else: lp = self.loopmesh if self.dipole_mesh_name[-4:] != '.bms': self.dipole_mesh_name += '.bms' # outer boundaries factor = np.sqrt(2.2) # sqareroot of 2 would be enough difx = np.max(lp[0]) - np.min(lp[0]) dify = np.max(lp[1]) - np.min(lp[1]) loopDim = max((difx, dify)) * factor / 2 maxDim = loopDim + self._maxdistance minz = np.min(lp[2]) maxz = np.max(lp[2]) reff = self._mindistance / self.refinement_para * len(self.pos) if minz > (-2 * reff): # to make place for proper refinement points minz = -2 * reff if verbose: print('automatically set minz to %f' % (-2 * reff)) # print(maxDim, minz, maxz) # create loopmesh (nodes, faces), then apply tetgen dipolemesh = pg.Mesh(3) # outer boundary nodes, bottom ob0 = dipolemesh.createNode(-maxDim, -maxDim, minz) ob1 = dipolemesh.createNode(maxDim, -maxDim, minz) ob2 = dipolemesh.createNode(maxDim, maxDim, minz) ob3 = dipolemesh.createNode(-maxDim, maxDim, minz) # outer boundary nodes, halfspace face ob8 = dipolemesh.createNode(-maxDim, -maxDim, 0) ob9 = dipolemesh.createNode(maxDim, -maxDim, 0) ob10 = dipolemesh.createNode(maxDim, maxDim, 0) ob11 = dipolemesh.createNode(-maxDim, maxDim, 0) # outer boundary nodes, top if maxz > 0: ob4 = dipolemesh.createNode(-maxDim, -maxDim, maxz) ob5 = dipolemesh.createNode(maxDim, -maxDim, maxz) ob6 = dipolemesh.createNode(maxDim, maxDim, maxz) ob7 = dipolemesh.createNode(-maxDim, maxDim, maxz) # air side dipolemesh.createQuadrangleFace(ob8, ob9, ob5, ob4) dipolemesh.createQuadrangleFace(ob9, ob10, ob6, ob5) dipolemesh.createQuadrangleFace(ob10, ob11, ob7, ob6) dipolemesh.createQuadrangleFace(ob11, ob8, ob4, ob7) # air top dipolemesh.createQuadrangleFace(ob4, ob5, ob6, ob7) # earth side dipolemesh.createQuadrangleFace(ob8, ob9, ob1, ob0) dipolemesh.createQuadrangleFace(ob9, ob10, ob2, ob1) dipolemesh.createQuadrangleFace(ob10, ob11, ob3, ob2) dipolemesh.createQuadrangleFace(ob11, ob8, ob0, ob3) # earth bottom dipolemesh.createQuadrangleFace(ob0, ob1, ob2, ob3) # halfspace face dipolemesh.createQuadrangleFace(ob8, ob9, ob10, ob11) maxArea = self._maxdistance**3 * self.max_area_para if quadratic: maxArea *= 8.0 # self.dipolemesh = mesh dipolemesh.createNode(0., 0., self.pos[0][2] - reff) pg.meshtools.exportPLC(dipolemesh, self.dipole_mesh_name.rstrip('.bms') + '.poly') # make tetgen mesh tetgen151(self.dipole_mesh_name.rstrip('.bms'), maxArea=maxArea, quality=self.tetgen_quality, verbose=verbose) self.dipolemesh = pg.Mesh(self.dipole_mesh_name.rstrip('.bms') + '.1.vtk') if quadratic: self.dipolemesh = self.dipolemesh.createP2() os.remove(self.dipole_mesh_name.rstrip('.bms') + '.1.vtk') os.remove(self.dipole_mesh_name.rstrip('.bms') + '.poly') self.dipolemesh.save(self.dipole_mesh_name) # for kernel call (piping) cleanUpTetgenFiles(self.dipole_mesh_name) return self.dipolemesh
[docs] def createFEMMesh(self, para_mesh_2d=None, savename=None, exportVTK=False, exportH5=True, box_x=[None, None], box_y=[None, None], box_z=None, box_cell_size=None, source_poly=None, source_setup='edges', source_loops=None, inner_area_cell_size=0.3, outer_area_cell_size=10, subsurface_cell_size=None, poly_2d=None, number_of_loops=None, **kwargs): """ Builds the FEM mesh for the secondary field computation. Needs at least on of the two possible parameter meshes in order to continue. Parameter --------- para_mesh_2d: string or pg.Mesh [ None ] Used to get the outer dimensions of the FEMMesh. savename: string [ None ] Define output save name of FEM mesh. Default name will be generated if None. If no savename is given, the dafaultname will be '_default_LoopMesh' + looptype + number of dipoles. exportVTK: boolean [ False ] Turn on optional vtk export. source_setup: string [ 'edges' ] Defines the way the sources are incorporated into the mesh. "nodes" simply insert the dipole positions (fallback), "edges" defines strait edges between the nodes (usually the best approach). "etra" can be used for a special setup where multiple loops are build in an elongated transmitter with inline receiver array. Raises an exception if source_setup differs from the three options. source_loops: list [ None ] If a list of loop classes is given, their tx representation after custEM is implemented in the mesh for custEM magnetic field calculations using automatic source detection. inner_area_cell_size: float [ 0.3 ] Maximum allowed area (m²) for all cell in the source plane within the source polygons (if closed loop). Very important for kernel calculation! See tutorial for custEM for further explanations. outer_area_cell_size: float [ 10 ] Maximum allowed area (m²) for all cells in the source plane outside the source polygons (or anywhere for not closed loop). See tutorial for custEM for further explanations. subsurface_cell_size: float [ None ] Maximum allowed volume (m³) for all cells within inner mesh box (not the tetrahedron boundary to 10 km). Optional. Kwargs ------ limits: list of len 2 [ None ] Minimum and maximum y value, the anomalies should be set in the fem mesh. Uses the x limits of the 2D parameter mesh as default if *None*. Dependencies ------------ custEM: Install via conda on Linux only. See install instructions of comet. """ from custEM.meshgen.meshgen_tools import BlankWorld from custEM.meshgen.meshgen_utils import export_tetgen_poly_file # checking para mesh 2D if para_mesh_2d is not None: self.setParaMesh2D(para_mesh_2d, limits=kwargs.pop('limits', None)) self._checkParaMesh() if self.secondary_config.sigma_paramesh == []: raise Exception( 'createFEMMesh needs an anomaly vector to work. The ' 'anomalies are used to detect and distribute the markers in ' 'the FE mesh correctly. Please use setParaMesh2D or ' 'setAnomalies in order to set te anomaly vector before ' 'calling createFEMMesh.') # name + dir test_path = Path(savename) if test_path.parent != Path('.'): raise Exception( 'Please enter only a filename as "savename" not a Path. If you' ' want to change the mesh directory please use the ' 'secondary_config (e.g. "createDefaultSecondaryConfig(m_dir=' '...) or change the "secondary_config.m_dir" attribute ' 'directly.").') self.setLoopMeshName(savename) # adds m_dir and _h5 mesh_path = Path(self.loop_mesh_name) log.debug([mesh_path, mesh_path.parent]) mesh_path.parent.mkdir(exist_ok=True, parents=True) # make new _h5 dir if needed # basename without suffix or path(string) mesh_name = mesh_path.stem # without suffix mesh_base = mesh_path.with_name(mesh_path.stem).as_posix() log.info('creating FEM mesh...') # compute outer limits for FEM mesh minx = box_x[0] if minx is None: minx = self.para_limits_x[0] maxx = box_x[1] if maxx is None: maxx = self.para_limits_x[1] miny = box_y[0] if miny is None: miny = self.para_limits_y[0] maxy = box_y[1] if maxy is None: maxy = self.para_limits_y[1] minz = box_z if minz is None: minz = self.para_limits_z[0] maxz = 0.5 * abs(minz) # inner poly and source # part 1: inner poly with refinement. log.info('inner mesh box x:, {}, {}'.format(minx, maxx)) log.info('inner mesh box y:, {}, {}'.format(miny, maxy)) log.info('inner mesh box z:, {}, {}'.format(minz, maxz)) # quick check for overlapping polys intersect = False if poly_2d is not None: if poly_2d.xmax() >= maxx: intersect = True if poly_2d.xmin() <= minx: intersect = True if poly_2d.ymin() <= minz: intersect = True if poly_2d.ymax() >= 0: intersect = True if intersect: msg = 'Refinement Box and poly_2d are intersecting. ' msg += 'Box(x, z) = [({:.2f}, {:.2f}), ({:.2f}, {:.2f})] '\ .format(minx, maxx, minz, 0) msg += 'should be able to contain poly_2d(x, z(or y)) = ' msg += '[({:.2f}, {:.2f}), ({:.2f}, {:.2f})]'\ .format(poly_2d.xmin(), poly_2d.xmax(), poly_2d.ymin(), poly_2d.ymax()) raise Exception(msg) blank_world_kwargs = {} if subsurface_cell_size is not None: blank_world_kwargs['subsurface_cell_size'] = subsurface_cell_size poly = BlankWorld(name=mesh_name, x_dim=[minx, maxx], y_dim=[miny, maxy], z_dim=[minz, maxz], m_dir=self.secondary_config.m_dir, inner_area_cell_size=inner_area_cell_size, outer_area_cell_size=outer_area_cell_size, **blank_world_kwargs) if source_poly is not None: if isinstance(source_poly, str): source_poly = pg.load(source_poly) add_lines = [] marker_pos = [] max_length = kwargs.pop('max_length', 0.5) for bound in source_poly.boundaries(): line_start = bound.allNodes()[0].pos().array() line_end = bound.allNodes()[1].pos().array() dist = np.sqrt(np.sum((line_end - line_start)**2)) number_segs = int((dist // max_length) + 1) line = np.column_stack( [np.linspace(line_start[0], line_end[0], number_segs), np.linspace(line_start[1], line_end[1], number_segs), np.linspace(line_start[2], line_end[2], number_segs)]) add_lines.append(line) marker_pos.extend([line_start, line_end]) # add_lines.append([a.pos().array() for a in bound.allNodes()]) if source_setup.lower() == 'loops': if source_loops is None: raise Exception('Need source_loops when source_setup ' 'is set to "loops".') source_tx = [] for loop in source_loops: source_tx.append(loop.getCustEMLoopTx(max_length)) print(source_tx) log.info('Add insert_loop_tx from given loop classes.') # poly.build_surface(insert_lines=add_lines, closed_path=False, # # line_marker=[88] * len(add_lines), # insert_loop_tx=source_tx) poly.build_surface(closed_path=False, # line_marker=[88] * len(add_lines), insert_loop_tx=source_tx) # now we want to append missing markers in the middle of the # polygons- This is somewhat experimental and works for rectangle # loops (doesn't matter how many) == good for etra or mixed etras # all_pos = np.array(add_lines).reshape(-1, 3) # n, 2, 3 -> 2*n, 3 all_pos = np.array(marker_pos).reshape(-1, 3) x_poss = np.sort(np.unique(all_pos[:, 0])) y_poss = np.sort(np.unique(all_pos[:, 1])) x_mids = (x_poss[:-1] + x_poss[1:]) / 2. y_mids = (y_poss[:-1] + y_poss[1:]) / 2. for xmid in x_mids: for ymid in y_mids: log.debug('Add additional marker at ({:.3f}, {:.3f})' .format(xmid, ymid)) poly.add_area_marker(poly.surface, [xmid, ymid, 0.0], face_size=inner_area_cell_size) # marker for outer cell size poly.add_area_marker(poly.surface, [minx + 0.1, miny + 0.1, 0.0], face_size=outer_area_cell_size) source_setup = 'skip' if source_setup.lower() == 'etra': if number_of_loops is None: raise Exception('Need "number_of_loops" when using ' 'source_setup="etra".') ay = np.max(self.pos[:, 1]) - np.min(self.pos[:, 1]) # new way buildEtraSourceDiscretization(poly, ay, x_left=np.min(self.pos[:, 0]), number_of_loops=number_of_loops) elif source_setup.lower() == 'edges': # closed polygon ? poly.build_surface(insert_loops=[self.pos], # sources closed_path=not self.grounded) elif source_setup.lower() == 'nodes': poly.build_surface(insert_points=self.pos) # sources elif source_setup.lower() == 'skip': pass elif source_setup.lower() == 'custem': poly.build_surface(**kwargs) # sources as defined using custem else: raise Exception('Source setup "{}", not in ["etra", "nodes", ' '"edges", "loops"].'.format(source_setup)) dipole_box = [np.min(self.pos[:, 0]) - 5, np.max(self.pos[:, 0]) + 5, np.min(self.pos[:, 1]) - 5, np.max(self.pos[:, 1]) + 5, -4] log.debug('add_surface_anomaly: {}' .format(np.array(dipole_box))) # add additional refinement box around dipoles only for the first 4 # meter (from top dipoles +5 meter in x and y to a depth of 4 meter). poly.add_surface_anomaly( insert_paths=[ np.array(((dipole_box[0], dipole_box[2], 0.0), (dipole_box[0], dipole_box[3], 0.0), (dipole_box[1], dipole_box[3], 0.0), (dipole_box[1], dipole_box[2], 0.0)))], depths=[dipole_box[4]], cell_sizes=[inner_area_cell_size]) # 3d marker # add marker just inside the corner of the new box poly.add_area_marker(poly.surface, [dipole_box[0] + 0.1, dipole_box[2] + 0.1, 0.0], face_size=inner_area_cell_size) # 2d marker # add additional refinement box if requested # if refinement_box is not None: # poly.add_surface_anomaly( # insert_loops=[ # np.array(((refinement_box[0], refinement_box[2], 0.0), # (refinement_box[0], refinement_box[3], 0.0), # (refinement_box[1], refinement_box[3], 0.0), # (refinement_box[1], refinement_box[2], 0.0)))], # depths=[refinement_box[4]], # cell_sizes=[box_cell_size]) # # # add marker just inside the corner of the new box # poly.add_area_marker(poly.surface, [refinement_box[0] + 0.1, # refinement_box[2] + 0.1, 0.0]) log.debug('build_halfspace_mesh()') poly.build_halfspace_mesh() # there_is_always_a_bigger_world works with factors: # I calculate the minimum factor needed to get at least a 10km boundary xfactor = 10000. / np.minimum(np.abs(poly.Omega.xmin()), np.abs(poly.Omega.xmax())) yfactor = 10000. / np.minimum(np.abs(poly.Omega.ymin()), np.abs(poly.Omega.ymax())) # minz too small, would lead to insane bounds zfactor = 10000. / np.absolute(poly.Omega.zmin()) poly.there_is_always_a_bigger_world(xfactor, yfactor, zfactor) # poly.eval_unique_markers() # custem > 1.0 poly.write_mesh_parameters() # SEMI TESTED preserve edges! if poly_2d: log.info('Add poly 2d to FEM mesh.') node_ids = [] doublecheck = {} for bound in poly_2d.boundaries(): node_ids.append(bound.ids()) for id0, id1 in node_ids: n0 = poly_2d.node(id0) n1 = poly_2d.node(id1) if id0 not in doublecheck.keys(): n0x = n0.x() n0y0 = miny + 0.5 # to stay inside refinement box n0y1 = maxy - 0.5 # to stay inside refinement box n0z = n0.y() log.debug( 'create poly2d nodes: ({:.2f}, {:.2f}, {:.2f}), ' '({:.2f}, {:.2f}, {:.2f})' .format(n0x, n0y0, n0z, n0x, n0y1, n0z)) doublecheck[id0] = [ poly.Omega.createNode([n0x, n0y0, n0z]), poly.Omega.createNode([n0x, n0y1, n0z])] if id1 not in doublecheck.keys(): n1x = n1.x() n1y0 = miny + 0.5 # to stay inside refinement box n1y1 = maxy - 0.5 # to stay inside refinement box n1z = n1.y() log.debug( 'create poly2d nodes: ({:.2f}, {:.2f}, {:.2f}), ' '({:.2f}, {:.2f}, {:.2f})' .format(n1x, n1y0, n1z, n1x, n1y1, n1z)) doublecheck[id1] = [ poly.Omega.createNode([n1x, n1y0, n1z]), poly.Omega.createNode([n1x, n1y1, n1z])] poly.Omega.createQuadrangleFace(doublecheck[id0][0], doublecheck[id0][1], doublecheck[id1][1], doublecheck[id1][0]) if self.preserved_edge_centers is not None: log.info('Add preserved edges from paramesh to FEM mesh.') edge_centers = np.array(self.para_mesh_2d.boundaryCenters()) preserved_edges = [] for center in self.preserved_edge_centers: preserved_edges.append( np.where(np.sum(np.isclose(edge_centers, center), axis=1) == 3)[0]) node_ids = [] for edge_id in preserved_edges: node_ids.append( self.para_mesh_2d.boundaries()[edge_id[0]].ids()) doublecheck = {} for id0, id1 in node_ids: # build quadrangle faces in 3D with 2D edge extended as plane # in 3D using y-vector endpoints n0 = self.para_mesh_2d.node(id0) n1 = self.para_mesh_2d.node(id1) # print([n0.x(), miny, n0.y()], [n1.x(), miny, n1.y()]) if id0 not in doublecheck.keys(): doublecheck[id0] = [ poly.Omega.createNode([n0.x(), miny + 0.1, n0.y()]), poly.Omega.createNode([n0.x(), maxy - 0.1, n0.y()])] if id1 not in doublecheck.keys(): doublecheck[id1] = [ poly.Omega.createNode([n1.x(), miny + 0.1, n1.y()]), poly.Omega.createNode([n1.x(), maxy - 0.1, n1.y()])] poly.Omega.createQuadrangleFace(doublecheck[id0][0], doublecheck[id0][1], doublecheck[id1][1], doublecheck[id1][0]) # exportTetgenEdge(preserve_edges, # mesh_path.with_suffix('.edge').as_posix()) if exportVTK or log.getEffectiveLevel() <= 10: # .Omega == pyGIMLi poly object poly.Omega.exportBoundaryVTU( mesh_path.with_name('2Dsurface.vtu').as_posix()) poly.Omega.exportVTK( mesh_path.with_name('exportpoly.vtk').as_posix()) # end custEM here export_tetgen_poly_file( poly.Omega, mesh_path.with_suffix('.poly').as_posix()) # poly.call_tetgen(tet_param='-kfpq1.4aA', export_vtk=True) # tetgen mesh success = tetgen151(mesh_base, maxArea='', quality=self.tetgen_quality) if success != 0: log.error('tetgen seems to have run into trouble.') log.error('used call: tetgen151("{}")'.format(mesh_base)) if poly_2d: log.error( 'You tried to insert a poly_2d. You can check the input ' 'file for tetgen under "{}" for poly intersections, ' 'it may help to find the origin of the problem.' .format(mesh_path.with_name('exportpoly.vtk').as_posix())) poly.Omega.exportVTK( mesh_path.with_name('exportpoly.vtk').as_posix()) # import mesh log.info('import FEM mesh...') loopmesh = pg.meshtools.readTetgen(mesh_base + '.1') # set mesh self.setFEMMesh(loopmesh, valid_marker=None, savename=self.loop_mesh_name) # clean up cleanUpTetgenFiles(mesh_base) # prepare saving Path(self.loop_mesh_name).parent.mkdir(exist_ok=True, parents=True) # export if exportVTK: self.loopmesh.exportVTK(mesh_path.with_suffix('.vtk').as_posix()) log.info('Export .bms ({})'.format(self.loop_mesh_name)) self.loopmesh.save(self.loop_mesh_name) # for kernel call (piping) if exportH5: h5path = mesh_path.with_suffix('.h5').as_posix() log.info('Export .h5 ({})'.format(h5path)) self.exportFenicsHDF5Mesh(h5path) # save secondary config for the purpose of slice generation and # calculation via custem self.secondary_config.save(mesh_path.with_suffix('.cfg').as_posix()) return self.loopmesh
[docs] def createSecondaryConfig(self, mod_name, mesh_name, m_dir='.', r_dir='.', pf_name=None, p2=False, approach='E_s', pf_EH_flag='E'): """ Initializes an instance of a secondary config for use of custEM. Parameters ---------- mod_name: string Name of the mod instance (for saving and import in mpi environment) mesh_name: string Basename of mesh imported by the fenics functions (.h5). Mind the subfolder '/_h5' that will be added to the string. m_dir: string Path to mesh directory of custEM. r_dir: string Path to result directory of custEM. pf_name: string File name under which the primary field will be saved in the appropriate directory of custEM. """ pf_name = pf_name or mod_name # different folder, so no harm done self.secondary_config = SecondaryConfig( mod_name=mod_name, mesh_name=mesh_name, m_dir=m_dir, r_dir=r_dir, pf_name=pf_name, p2=p2, approach=approach, pf_EH_flag=pf_EH_flag, sigma_ground=1./np.atleast_1d(self.config.rho)) self.loop_mesh_name = Path('{}/_h5/{}'.format( self.secondary_config.m_dir, self.secondary_config.mesh_name)).as_posix()
[docs] def createDefaultSecondaryConfig(self, base=None, prefix='', suffix='', m_dir='.', r_dir='.'): """ Short cut to generate a secondary config with some default params. Parameters ---------- prefix: string String to be added to the *getDefaultLoopMeshBaseName* string to define the automatic generated names for the default secondary config. suffix: string String to be added to the *getDefaultLoopMeshBaseName* string to define the automatic generated names for the default secondary config. """ if base is None: if self.name is not None: base = Path(self.name).stem else: if self.loop_mesh_name is None: base = self.getDefaultLoopMeshBaseName()[9:] else: base = Path(self.loop_mesh_name).stem base = '{}{}{}'.format(prefix, base, suffix) mod_name = '{}'.format(base) mesh_name = 'mesh_{}'.format(base) self.createSecondaryConfig(mod_name, mesh_name, m_dir=m_dir, r_dir=r_dir)
[docs] def getCustEMLoopTx(self, max_length): if self.type.lower() in ['rectangle', 'square']: from custEM.meshgen import meshgen_utils as mu xmin = np.min(self.pos[:, 0]) xmax = np.max(self.pos[:, 0]) ymin = np.min(self.pos[:, 1]) ymax = np.max(self.pos[:, 1]) nx = int(((xmax - xmin) // max_length) + 1) ny = int(((ymax - ymin) // max_length) + 1) loop_tx = mu.loop_r([xmin, ymin], [xmax, ymax], nx=max(nx, 1), ny=max(ny, 1)) # loop_tx = np.array([[xmin, ymin, 0], # [xmax, ymin, 0], # [xmax, ymax, 0], # [xmin, ymax, 0]]) else: raise Exception('No recipy for automatic creation of custem tx ' 'shape for loop-type "{}"'.format(self.type)) return loop_tx
[docs] def setLoopMeshName(self, savename=None): """ Sets loop mesh name or figures it out from sec config. """ invalid_path = False if savename is None: savename = self.getDefaultLoopMeshBaseName() else: if self.secondary_config is not None: if '_h5' not in Path(savename).parts: invalid_path = True log.info('using basename of savename and use mesh ' 'directory as defined in secondary config!') savename = Path(savename).name if invalid_path: mesh_path = Path( self.secondary_config.m_dir ).joinpath('_h5').joinpath(savename).with_suffix('.bms') else: mesh_path = Path(savename) if self.secondary_config is not None: self.secondary_config.mesh_name = mesh_path.stem self.loop_mesh_name = mesh_path.as_posix()
[docs] def getDefaultLoopMeshBaseName(self): """ Returns string with default base name of the loop mesh. """ return '_default_LoopMesh_{}_{}'.format(self.type, len(self.ds))
[docs] def getParaMesh2D(self): # , with_bounds=False): # if with_bounds is False: return self._para_mesh_2d
# else: # return self._para_mesh_2d_with_bounds
[docs] def save(self, savename=None, config_savename=None, config2_savename=None, save_mesh=True, save_field=True): """ Saves the loop class in files. Saves npz archive with loop itself.\n Saves config.\n Saves secondary config if initialized.\n Saves mesh if save_mesh=True.\n Saves field if save_field=True. Parameters ---------- savename: string [ None ] File basename for saving loop class and its components. config_savename: string [ None ] Explicit savename for config. Automatically generated if None. config2_savename: string [ None ] Explicit savename for secondary config. Automatically generated if None. save_mesh: boolean [ True ] Saves mesh. save_field: boolean [ True ] Saves fields. """ # geometry if savename is not None: self.name = savename if self.name.endswith('.npz'): self.name = self.name[:-4] if self.name is None: raise Exception('Need savename to save loop instance.') log.info(f'Save Loop: {self.name}') Path(self.name).parent.mkdir(exist_ok=True, parents=True) # loop mesh if self.loopmesh is not None: if isinstance(self.loopmesh, np.ndarray): self.setLoopMesh(self.loopmesh) if self.loop_mesh_name is None: self.loop_mesh_name = self.name + '_mesh.bms' if save_mesh is True: log.info(f'Save mesh: {self.loop_mesh_name}') self.loopmesh.save(self.loop_mesh_name) # dipole mesh if self.dipolemesh is not None: if self.dipole_mesh_name is None: self.dipole_mesh_name = self.name + '_dipole_mesh.bms' if save_mesh is True: log.info(f'Save Dipole mesh: {self.dipole_mesh_name}') self.dipolemesh.save(self.dipole_mesh_name) # config if config_savename is None: save_config = '.cfg' self.config.save(self.name + save_config) else: save_config = config_savename self.config.save(save_config) if self.secondary_config is not None: # secondary config if config2_savename is None: self.cfg2_savename = self.name + '_sec.cfg' else: self.cfg2_savename = config2_savename # refresh frequency, in case of total field calculation self.secondary_config.frequency = self.config.f self.saveSecondaryConfig() # test FE Mesh names are the same name1 = Path(self.loop_mesh_name).with_suffix('').absolute() name2 = Path('{}/_h5/{}'.format( self.secondary_config.m_dir, self.secondary_config.mesh_name) ).with_suffix('').absolute() if name1 != name2: # this might be intentional, so just a warning, moving on... log.warning('Loop mesh name ({}) and mesh name from ' 'secondary config ({}) are different!' .format(name1.as_posix(), name2.as_posix())) else: self.cfg2_savename = None # names + fields if save_field is False: savefield = None else: savefield = self.field absmesh = None if self.loop_mesh_name is not None: absmesh = os.path.abspath(self.loop_mesh_name) absdipole = None if self.dipole_mesh_name is not None: absdipole = os.path.abspath(self.dipole_mesh_name) np.savez(self.name + '.npz', save_config=save_config, save_config2=self.cfg2_savename, save_field=savefield, save_mesh=absmesh, save_dipole_mesh=absdipole, save_d_field=self.d_field, save_idx=self.idx, save_area=self.area, save_turns=self.turns, ltype=self.type, grounded=self.grounded, pos=self.pos, phi=self.phi, ds=self.ds) if self.secondary is not None: np.save('{}_secondary.npy'.format(self.name), self.secondary)
def _dump2Temp(self): import tempfile as temp # create temporary filename loopFile = temp.NamedTemporaryFile(suffix='.npz', delete=False) savename = loopFile.name loopFile.close() cfgFile = temp.NamedTemporaryFile(suffix='.cfg', delete=False) configname = cfgFile.name cfgFile.close() self.save(savename=savename, config_savename=configname, save_mesh=False, save_field=False) return (savename, configname)
[docs] def saveLoopMesh(self, savename=None): """ Saves loopmesh using the given savename or an autogenerated name. Updates *self.loop_mesh_name* in case of changes. Parameters ---------- savename: Export path name. Used over default name if given. """ if self.loopmesh is None: raise Exception('No loopmesh to save. Abort.') if savename is None: if self.loop_mesh_name is None: self.loop_mesh_name =\ self.getDefaultLoopMeshBaseName() + '.bms' else: self.loop_mesh_name = savename Path(self.loop_mesh_name).parent.mkdir(exist_ok=True, parents=True) print('saving bms: "{}"'.format(self.loop_mesh_name)) self.loopmesh.save(self.loop_mesh_name)
[docs] def saveFieldMatrix(self, name, verbose=True): """ Saves the three matrices needed for recalculation of the primary field. A compressed numpy archive is loaded and the matrices are build afterwards, therefore import time is ~20% higher compared to the pure pygimli way ( *.field_matrix.save('...')* ). However, because the single arrays (indices and values) are saved in one compressed file archive they need only one third space on the hard disk compared to saving three separate matrices using pygimli syntax. Parameters ---------- name: string Path for file to be saved. verbose: boolean [ True ] Turn on verbose mode. """ # from comet.pyhed.misc import loadSparseMatrixFromNumpyArchive if verbose: print('Saving fieldMatrices as "{}"'.format(os.path.abspath(name))) Path(name).parent.mkdir(exist_ok=True, parents=True) # exportSparseMatrixAsNumpyArchive(name, # self.field_matrix, # self.field_matrix_sin, # self.field_matrix_cos) self.field_matrix.save('{}_field_matrix.bmat'.format(name)) self.field_matrix_sin.save('{}_field_matrix_sin.bmat'.format(name)) self.field_matrix_cos.save('{}_field_matrix_cos.bmat'.format(name))
[docs] def saveSecondaryConfig(self, savename=None): """ Saves secondary config in ASCII file. Parameters ---------- savename: string [ None ] Used savename over *loop.sec_savename*. Throws Exception if both values are None. Replaces *loop.sec_savename*. """ if savename is not None: self.cfg2_savename = savename if self.cfg2_savename is None: raise Exception('Need savename to save secondary configuration.') self.secondary_config.save(self.cfg2_savename)
[docs] def exportVTK(self, save_vtk, secondary=False, **kwargs): """ Exports the field in a vtk file. Uses the *loopmesh* to save *field* with default configurations in a vtk file. Parameters ---------- save_vtk: string Filename of the resulting vtk file. kwargs: dict Keyword arguments are redirected to the function *pyhed.IO.savefieldvtk*. """ if secondary and self.secondary is None: raise Exception('No secondary field found.') if save_vtk is None: save_vtk = self.loop_mesh_name[:-4] + '.vtk' if not secondary: if self.field is None: if self.loopmesh is None: raise Exception('No field or mesh found.') self.loopmesh.exportVTK(save_vtk) else: from comet.pyhed.IO import savefieldvtk savefieldvtk(save_vtk, self.loopmesh, self.field, itype='Mesh', **kwargs) else: from comet.pyhed.IO import savefieldvtk savefieldvtk(save_vtk, self.loopmesh, self.secondary, itype='Mesh', **kwargs)
[docs] def exportFenicsHDF5Mesh(self, save_h5, dipole_mesh=False, **kwargs): """ Exports the mesh in a h5 file. Can save the loopmesh or the dipole mesh seperately. Need pygimli to work. Parameters ---------- save_h5: string Filename of the resulting h5 mesh (hdf5 data container in fenics syntax). dipole_mesh: boolean [ False ] Save dipole mesh instead of loop mesh (Call this function twice if you want to save both meshes). kwargs: dict Keyword arguments are redirected to *pygimli.meshtools.exportFenicsHDF5Mesh* """ if dipole_mesh is False: pg.meshtools.exportFenicsHDF5Mesh(self.loopmesh, save_h5, **kwargs) else: pg.meshtools.exportFenicsHDF5Mesh(self.dipolemesh, save_h5, **kwargs)
[docs] def initCustEM(self, secondary_config=None, init_primary_field_class=True, procs_per_proc=2): """ Initalizes instance of custEM mod class for FEM calculation. Parameters ---------- secondary_config: string or pyhed.SecondaryConfig [ None ] Initialized secondary config class to be used for the mod instance or path to corresponding file containing the secondary config. Uses secondary_config over *loop.secondary_config*. Throws Exception if both values are None. init_primary_field_class: boolean [ True ] Additionally initializing the primary field class of the mod class instance (used for primary field export). """ from custEM.core import MOD from custEM.fem.primary_fields import PrimaryField if secondary_config is not None: self.setSecondaryConfig(secondary_config) if self.secondary_config is None: raise Exception('No config for secondary field calculation has ' 'been loaded.') self.secMOD = MOD(self.secondary_config.mod_name, # mod_name self.secondary_config.mesh_name, # mesh_name self.secondary_config.approach, # custem approach # kwargs are following p=self.secondary_config.polynominal_order, overwrite_results= self.secondary_config.enable_overwrite, m_dir=self.secondary_config.m_dir, # directory where to find the meshs + '/_h5' r_dir=self.secondary_config.r_dir, mumps_debug=False, debug_level=20, profiler=False) sigma_ground = np.copy(self.secondary_config.sigma_ground).tolist() +\ self.secondary_config.sigma_anom # sigma_0=np.copy(self.secondary_config.sigma_ground).tolist() sigma_0 = np.copy(self.secondary_config.sigma_ground).tolist() +\ np.array( self.secondary_config.sigma_ground)\ [np.array(self.secondary_config.layer_markers) - 1].tolist() self.secMOD.MP.update_model_parameters( f=self.config.f, sigma_ground=sigma_ground, sigma_0=sigma_0 ) if init_primary_field_class: log.info('initialize primary field class') self.secMOD.FS.PF = PrimaryField( self.secMOD.FS, self.secMOD.FE, self.secMOD.MP, pf_type='custom', pf_name=self.secondary_config.pf_name, s_type='loop_r')
[docs] def updateFEMAnomaly(self, anomaly=None, set_marker=True, set_attributes=False, vtk_name=None, ground_marker=None, export_H5=False, sort=True): """ Transfers resistivity anomalies from 2D para mesh in FEM mesh. Parameters ---------- anomaly_vector: array_like [ None ] Array containing the resistivity anomalies of the 2D parameter mesh. If None, the secondary config is asked for a anomaly vector. (For setting the marker for exmaple). set_marker: boolean [ True ] Transfers the marker from the parameter mesh to the FEM mesh. This only has to be done once and can then switched off for performance. set_attribute: boolean [ False ] Sets the attribute in the FEM mesh for debugging purposes. The anomaly vector for calculation is stored in secondary_config. vtk_name: string [ None ] Optional vtk export with name = *vtk_name* if *vtk_name* is not None. ground_marker: array_like [None] Corresponding marker for each entry in the anomaly vector. Each marker corresponds to a layer number of the 1d primary field beginning at 1 for the first layer, counting upward (0 belongs to the air layer). None results in np.ones_like(anomaly_vector, dtype=int). """ # input check, anomaly handling in paramesh self._checkParaMesh() assert self.loopmesh is not None, 'No FE mesh found.' # anomaly if anomaly is not None: # case 1/4: anomaly given: set anom (paramesh) self.setAnomalies(anomaly=anomaly, sort=sort) # sets self.secondary_config.sigma_paramesh set_sigma_anom = True # check FE mesh marker # IMPORTANT: setAnomalies needs to be called before _setFEMMarker # otherwise the marker in the paramesh are wrong and this function # will fail. this is one reason why _setFEMMarker is a underscore # function as this is not automatically detectable and caused more # than one headache. # handling marker and attributes of FEM Mesh if set_marker or self.valid_marker is None: self._setFEMMarker() log.debug('updateFEMAnomaly(): ({}/{}) used paramesh marker in FE mesh' .format(np.sum(self.valid_marker), len(self.valid_marker))) if self.secondary_config.sigma_paramesh != []: # case 2/4: not given, look in secondary config anomaly = np.array(self.secondary_config.sigma_paramesh) log.debug('updateFEMAnomaly: no anomalies given, found ' 'sigma_paramesh in secondary_config: {} ({})' .format(anomaly, len(anomaly))) sigma_anom = np.array(anomaly)[self.valid_marker] log.info('updateFEMAnomaly: using valid_marker: sigma_anom: ' '{} ({})' .format(sigma_anom, len(sigma_anom))) set_sigma_anom = True else: # cases 3 and 4 still not given and not found in secondary config # searching on for imported sigma_anom (e.g. from another run) # In this case the sigma:anom is not evaluated again. if self.secondary_config.sigma_anom != []: # case 3/4: sigma anom found in secondary config # e.g. imported from earlier # use only marker found in FE mesh (valid_marker) sigma_anom = self.secondary_config.sigma_anom log.info('updateFEMAnomaly: Found sigma_anom in ' 'secondary config: {} ({})' .format(sigma_anom, len(sigma_anom))) set_sigma_anom = False log.info('updateFEMAnomaly: sigma_anom will not be changed.') else: # case 4/4: no anomly, no 2D fields, problem -> abort raise Exception('updateFEMAnomaly: no anomaly vector ' 'found or given, abort.') # last check to prevent old bug, leave it in assert len(sigma_anom) != 0 # ground resistivities ground = np.atleast_1d(self.secondary_config.sigma_ground) if ground_marker is not None: ground_marker = np.array(ground_marker)[self.valid_marker] if set_attributes or log.getEffectiveLevel() <= 10: vals = np.array([1e-8], dtype=np.float) vals = np.concatenate([vals, ground]) vals = np.concatenate([vals, sigma_anom]) attr = vals[self.loopmesh.cellMarkers()] log.debug('loop_bib.setFEMAnomaly(..., set_attributes=True): ' 'export res instead of sigma!') self.loopmesh.setCellAttributes(1./attr) if log.getEffectiveLevel() <= 10: # active debug mode if vtk_name is None: # take femmesh name and put vtk there vtk_name = self.secondary_config.m_dir + '/_h5/' vtk_name += self.secondary_config.mesh_name vtk_name += '_debug_FEMesh.vtk' if vtk_name is not None: Path(vtk_name).parent.mkdir(exist_ok=True, parents=True) self.loopmesh.exportVTK(vtk_name, pg.Vector(1./attr)) # sets: self.secondary_config.sigma_anom = new_anoms # sets: self.secondary_config.layer_markers = ground_marker if set_sigma_anom: # cases 1 or 2: set anom (FE mesh) # in case 3: has been imported and will not be updated self.secondary_config.setAnomalies(sigma_anom, layer_markers=ground_marker)
[docs] def updateFEMAnomaly_old(self, anomaly=None, set_marker=True, set_attributes=False, vtk_name=None, ground_marker=None, export_H5=False): """ Transfers resistivity anomalies from 2D para mesh in FEM mesh. Parameters ---------- anomaly_vector: array_like [ None ] Array containing the resistivity anomalies of the 2D parameter mesh. If None, the secondary config is asked for a anomaly vector. (For setting the marker for exmaple). set_marker: boolean [ True ] Transfers the marker from the parameter mesh to the FEM mesh. This only has to be done once and can then switched off for performance. set_attribute: boolean [ False ] Sets the attribute in the FEM mesh for debugging purposes. The anomaly vector for calculation is stored in secondary_config. vtk_name: string [ None ] Optional vtk export with name = *vtk_name* if *vtk_name* is not None. ground_marker: array_like [None] Corresponding marker for each entry in the anomaly vector. Each marker corresponds to a layer number of the 1d primary field beginning at 1 for the first layer, counting upward (0 belongs to the air layer). None results in np.ones_like(anomaly_vector, dtype=int). """ # input check, anomaly handling in paramesh self._checkParaMesh() assert self.loopmesh is not None, 'No FE mesh found.' # check FE mesh marker # handling marker and attributes of FEM Mesh if set_marker or self.valid_marker is None: self.setFEMMarker() log.debug('updateFEMAnomaly(): ({}/{}) used paramesh marker in FE mesh' .format(np.sum(self.valid_marker), len(self.valid_marker))) # anomaly if anomaly is None: # case 1/2: not given, look in secondary config anomaly = np.array(self.secondary_config.sigma_anom) log.debug('updateFEMAnomaly found sigma anom: {} ({})' .format(anomaly, len(anomaly))) # already checked for valid marker, use directly new_anoms = anomaly else: # case 1/2: anomaly given, check for consistency anomaly = self._checkAnomaly(anomaly) # use only marker found in FE mesh (valid_marker) new_anoms = np.array(anomaly)[self.valid_marker] log.debug('updateFEMAnomaly(): sigma_anom = {}'.format(len(new_anoms))) # last check to prevent old bug, leave it in assert len(anomaly) != 0 # ground resistivities ground = np.atleast_1d(self.secondary_config.sigma_ground) if ground_marker is not None: ground_marker = np.array(ground_marker)[self.valid_marker] if set_attributes: vals = np.array([1e-8], dtype=np.float) vals = np.concatenate([vals, ground]) vals = np.concatenate([vals, new_anoms]) attr = vals[self.loopmesh.cellMarkers()] log.debug('loop_bib.setFEMAnomaly(..., set_attributes=True): ' 'export res instead of sigma!') self.loopmesh.setCellAttributes(1./attr) if vtk_name is not None: Path(vtk_name).parent.mkdir(exist_ok=True, parents=True) self.loopmesh.exportVTK(vtk_name, pg.Vector(1./attr)) log.info('updateFEMAnomaly(): {} anomalies.'.format(len(new_anoms))) # sets: self.secondary_config.sigma_anom = new_anoms # sets: self.secondary_config.layer_markers = ground_marker self.secondary_config.setAnomalies(new_anoms, layer_markers=ground_marker) log.debug('sec config sigma ground: {} ({})' .format(self.secondary_config.sigma_ground, len(self.secondary_config.sigma_ground))) log.debug('sec config sigma anom: {} ({})' .format(np.array(self.secondary_config.sigma_anom), len(self.secondary_config.sigma_anom)))
[docs] def prepareSecondaryFieldCalculation(self, savename=None, secondary_config=None, fem_mesh=None, para_mesh_2d=None, set_marker=False, anomaly_vector=None, valid_marker=None, verbose=False, num_cpu=32, force_primary=False, export_vtk=False, mod_name=None, **kwargs): """ Based on the given secondary config a MOD instance using the third party module custEM will be initialized. This includes the optional generation of a FEM suited mesh containing resistivity information from a 2D parameter mesh. Parameters ---------- savename: string [ None ] Name under which loopclass and secondary config (+= '_sec.cfg') are to be saved. Needed for secondary approach. secondary_config: pyhed.SecondaryConfig or string [ None ] Filename of configuration file or initialized class instance of a secondary configuration. Optional if already given manually. fem_mesh: pg.Mesh or string [None] FEM suited mesh or filename, respectively. Optional. If not given a suited mesh will be generated if a valid para_mesh_2d is provided. para_mesh_2d: pg.Mesh or string [ None ] 2D parameter mesh providing cell indices for the appending of resitivity information. Needed for automatic FEM mesh generation. Can be set manually beforehand. set_marker: boolean [ True ] Flag to decide if the fem mesh has got the needed marker for the resitivity distribution. Can be omitted if already done and saved (e.g. if same mesh is used again). anomaly_vector: np.ndarray [ None ] Conductivity values [S/m] of the parameter mesh to be used in the seocondary field approach. Uses given value over array found in secondary config. Raises Exception if neither found nor given. ground_marker: np.ndarray [ None ] Corresponding marker for each entry in the anomaly vector. Each marker corresponds to a layer number of the 1d primary field beginning at 1 for the first layer, counting upward (0 belongs to the air layer). None results in np.ones_like(anomaly_vector, dtype=int). verbose: boolean [ False ] Turn on verbose mode. num_cpu: integer [ 32 ] Maximum number of processes allowed for this task. force_primary: boolean [ False ] Force a recalculation of the primary field. mod_name: string or None [ None ] Overrides mod name. Useful if looping over many loops, as default name could be similar. magnetic: boolean [ True ] Prepares magnetic primary fields. If False only dummies are created to avoid error messages from custEM during import. Set to False if secondary electric approach is used for secondary field calculation. electric: boolean [ True ] Prepares electric primary fields. If False only dummies are created to avoid error messages from custEM during import. Set to False if secondary magnetic approach is used for secondary field calculation. Returns: -------- tuple: (savename, sec_savename) Absolute file paths for the secondary approach. Usage: ------ In order to prepare a secondary field calculation you need: - a secondary config (default is provided) - a conductivity vector (*) - a 2d parameter mesh matching the anomalies (*) - a marker_vector (*) *if not in secondary config or proviedd beforehand and optionally either - fem_mesh (without marker -> set_marker=True (default)) or - fem_mesh (with marker -> set_marker=False) or - no fem_mesh (auto creation) """ new_save = False # additional saving required # setting optional secondary configuration if secondary_config is not None: self.setSecondaryConfig(secondary_config) pf_EH_flag = kwargs.pop('pf_EH_flag', None) if pf_EH_flag is not None: self.secondary_config.pf_EH_flag = pf_EH_flag approach = kwargs.pop('approach', None) if approach is not None: self.secondary_config.approach = approach # simple check to save time if '_t' in self.secondary_config.approach: raise Exception('Total field approaches are not yet supported.') # setting optional parameter mesh if para_mesh_2d is not None: self.setParaMesh2D(para_mesh_2d, anomaly=anomaly_vector, sort=kwargs.pop('sort', True)) new_save = True self._checkParaMesh() # setting optional FEM mesh if fem_mesh is not None: self.setFEMMesh(fem_mesh, savename=self.secondary_config.mesh_name, valid_marker=valid_marker) set_marker = False if valid_marker is None and self.valid_marker is None: new_save = True # if no mesh available or provided: create if self.loopmesh is None: self.createFEMMesh() set_marker = False new_save = True assert self.loopmesh is not None log.info('setting marker: {}'.format(set_marker)) if set_marker is True: new_save = True log.info('need to save mesh again: {}'.format(new_save)) self.updateFEMAnomaly(anomaly=anomaly_vector, set_marker=set_marker) if self.secondary_config.sigma_anom is None: raise Exception('No anomaly vector found. Abort.') if not os.path.exists('{}/_h5/{}.h5'.format( self.secondary_config.m_dir, self.secondary_config.mesh_name)): new_save = True if new_save: # ...then the copy on harddrive needs updating... np.save('{}/_h5/fem_marker.npy' .format(self.secondary_config.m_dir), self.valid_marker) # export pygimli version of FEM mesh with markers (.bms) self.saveLoopMesh('{}/_h5/{}.bms'.format( self.secondary_config.m_dir, self.secondary_config.mesh_name)) # export vtk version of FEM mesh with markers (.vtk) vtk_name = '{}/_h5/{}.vtk'.format( self.secondary_config.m_dir, self.secondary_config.mesh_name) log.info('saving vtk: "{}"'.format(vtk_name)) self.loopmesh.exportVTK(vtk_name) # export fenics version of FEM mesh with markers (.h5) h5_loop_name = self.loop_mesh_name[:-4] + '.h5' log.info('saving h5: "{}"'.format(os.path.abspath(h5_loop_name))) log.info('unique cell marker: {}' .format(len(np.unique(self.loopmesh.cellMarkers())))) pg.meshtools.exportFenicsHDF5Mesh(self.loopmesh, h5_loop_name) # recalculation of primary fields required? found = self._checkPrimaryFields() calc = not found or force_primary log.info('primary field: found: {}, force: {}, calc: {}' .format(found, force_primary, calc)) if calc: self.calcAndExportFieldsForFenics( num_cpu=num_cpu, export_vtk=export_vtk, debug=kwargs.pop('debug', False), **kwargs) # updating name if mod_name is not None: self.secondary_config.mod_name = mod_name # save for secondary field calculation call in mpi environment if savename is None: savename = Path(self.secondary_config.r_dir).joinpath( self.secondary_config.mod_name + '_primary_loop').as_posix() self.save(os.path.abspath(savename), save_mesh=False) # is already saved
[docs] def show(self, **kwargs): """ Plots the Loopdiscretisation and the dipole directions and Length. For inspection of the loop-class and debugging purpose. Or for your curiosity. Parameters ---------- kwargs: dict Keyword arguments are redirected to *pyhed.plot.plot_bib.showLoop*. """ from comet.pyhed.plot.plot_bib import showLoop return showLoop(self.pos, self.phi, self.ds, **kwargs)
[docs] def effectiveArea(self): """ Returns *self.area* * *self.turns* (0 for not closed loops). """ return self.area * self.turns
def _checkParaMesh(self, raise_it=True): if self.para_mesh_2d is None: if raise_it: raise Exception('Need parameter mesh in order to set anomaly ' 'vector.') else: return False else: return True def _checkPrimaryFields(self): """ Check if primary fields already exists and returns True/False. """ if self.secondary_config is None: return False # for total field approach, no primary fields are needed if '_t' in self.secondary_config.approach: return True pf_path = '{}/primary_fields/{}/{}'.format( self.secondary_config.r_dir, self.secondary_config.pf_type, self.secondary_config.pf_name) # extensive check for previously calculated fields for field in ['E', 'H']: for complexity in ['real', 'imag']: if not os.path.exists('{}_{}_0_{}_cg.h5'.format( pf_path, field, complexity)): return False return True def _handleNonUniqueDipoles(self): """ Internally called function. Sort out double dipole positions and handle their phi and ds. """ # new dtype looks like three floats in a row # so unique searches for a unique combination of floats, instead of # single unique floats posview = self.pos.ravel().view( np.dtype((np.void, self.pos.dtype.itemsize*self.pos.shape[1]))) _, unique_idx, inverse_idx, counts_idx = np.unique(posview, return_index=True, return_inverse=True, return_counts=True) # found unique: now sort, filter and merge double pos sorted_idx = unique_idx.argsort() unique_sorted_idx = unique_idx[sorted_idx] new_pos = np.array(self.pos)[unique_sorted_idx] new_ds = np.array(self.ds)[unique_sorted_idx] new_phi = np.array(self.phi)[unique_sorted_idx] for i, count in enumerate(counts_idx[sorted_idx]): if count > 1: idx_i = np.where(inverse_idx == inverse_idx[i]) new_ds[i] = np.sum(self.ds[idx_i]) new_phi[i] = np.einsum('i,i->', self.ds[idx_i], self.phi[idx_i])/new_ds[i] orig_pos = len(self.pos) self.pos = new_pos self.phi = new_phi self.ds = new_ds log.debug('merge dipoles: {} -> {} dipoles'.format( orig_pos, len(self.pos))) def _setDistance(self): """ Biggest and smallest distance between two dipoles of the loop. """ if self.type not in ('dipole', 'Dummy') and len(self.pos) > 1: dislist = [] for i in range(len(self.pos) - 1): # (n * (n - 1)) / 2 steps to calculate dislist.extend(-self.pos[i + 1:] + self.pos[i]) dislist = np.array(dislist).T sqaredis = np.sqrt(np.sum(dislist**2, axis=0)) self._maxdistance = np.max(sqaredis) self._mindistance = np.min(self.ds) else: self._maxdistance = 5 self._mindistance = 0.2 def _loadLoopGeometry(self, filename): """ Loads the basic geometry needed to initalise a loop class. Called by *loop.load* to load geometry from binary npz file archive. Parameters ---------- filename: string Path to file destination. """ DeprecationWarning( 'This is an old loop class. Please save again and you can get rid ' 'of the *_geometry* file. Its now included in the main *.npz*.') archive = np.load(filename) self.type = str(archive['ltype']) self.grounded = bool(archive['grounded']) self.pos = archive['pos'] self.phi = archive['phi'] self.ds = archive['ds'] self._setDistance() def _checkAnomaly(self, anomaly): """ Internal function to check anomaly vector. Accounts for triangle Boundaries around paramesh if set. """ # shortcut to avoid if-else in other functions if anomaly is None: return anomaly log.debug('_checkAnomaly input anomaly: {} ({})' .format(anomaly, len(anomaly))) # import check self._checkParaMesh() if len(anomaly) != self.orig_cell_count_2d: raise Exception('Dimension mismatch between original paramesh ' '({} cells) and anomaly vector ({} values).' .format(self.orig_cell_count_2d, len(anomaly))) # prepare anomaly vector with respect to parameter mesh prolongation if self.has_boundary is False: anom = anomaly else: anom = np.zeros(self.para_mesh_2d.cellCount()) anom[self.orig_cells] = anomaly anom = pg.Vector(anom) # inplace prolongation needs reference # therefore explicit pg.Vector call is needed before giving anom # to prolongateEmptyCellsValues self.para_mesh_2d.prolongateEmptyCellsValues(anom) log.debug('_checkAnomaly: detect boundary around paramesh: ' 'prolongate empty cells values.') with temporal_printoptions(): log.debug('_checkAnomaly prolonged anomaly: {} ({})' .format(np.array(anom), len(anom))) self.para_mesh_2d.setCellAttributes(anom) # pygimli to numpy anom = np.array(anom) if self.paramesh_sort: # sorted cells: anomaly vector has unique values # self.anom_idx -> from 2d mesh with bounds to unique vals anom = anom[self.anom_idx] log.debug('_checkAnomaly: {} ({})' .format(anom, len(anom))) # return one value per unique marker in FEM Mesh return anom
[docs]class Geometry: def __init__(self): self.poly = None
def _checkClockwise(points): """ Polygon Area formula. Works for most closed polygons. (Not for figure-of-eight, or multiknot loops) Parameters ---------- points: np.ndarray List or array of coordinates of a closed polygon to determine if the order is in clockwise or counterclockwise order. """ points = np.asarray(points) x_1 = np.append(points[1:, 0], points[0, 0]) y_1 = np.append(points[1:, 1], points[0, 1]) area = np.sum((x_1 - points[:, 0]) * (points[:, 1] + y_1)) if np.isclose(area, 0): log.warning( 'Attribute "clockwise" could not be determined in this ' 'overlapping loop. Please check the directions of the dipoles.') return bool(np.sign(area) == 1) def _3dlinspace(Point1, Point2, num): """ Internal function. Like linspace but for threedimensional points. """ return np.array((np.linspace(Point1[0], Point2[0], num), np.linspace(Point1[1], Point2[1], num), np.linspace(Point1[2], Point2[2], num)), np.float64).T
[docs]def computeLoopPositions(Coordinates, ltype='arbitrary', middle=None, # for circle only grounded=True): """ This function calculates the position of the dipoles in order to represent an arbitrary shaped loop with the given coordinates, the angle between its orientation and the x-direction and the dipole Length it represents. Parameters ---------- Coordinates: np.ndarray Input list/array of points of shape: (n, 3) for n dipoles. ltype: string Defines the general type of the loop and therefore some internal attributes. Choices are:\n - 'rectangle' (also for square loops) - 'circle' - 'arbitrary' (for all other loops) middle: np.ndarray [ None ] Midpoint of the circular loop to calculate radius correct (and therefore the correct source coordinates). grounded: boolean [ True ] For non grounded wires there are dipole placed bewtween the last coordinate point and the first. This is ommitted for grounded wires. """ if isinstance(Coordinates, (list, tuple)): Coordinates = np.array(Coordinates, np.float64) realltype = ltype if ltype not in ('circle', 'arbitrary', 'line'): # basically the same as any arbitrary loop ltype = 'arbitrary' if ltype == 'circle': coords_x = Coordinates.T[0] - middle[0] coords_y = Coordinates.T[1] - middle[1] Angles = np.arctan2(coords_y, coords_x) phis = np.zeros(len(Angles)) Positions = np.zeros((len(Angles), 3), dtype=np.float64) radius = np.sqrt(coords_x[0]**2 + coords_y[0]**2) ds = np.zeros(len(Angles), dtype=np.float64) n = len(Angles) for i in range(n): phi_ii = Angles[(i + 1) % n] phi_i = Angles[i] if phi_i > phi_ii: phi_i += 2 * np.pi phis[i] = ((phi_i + phi_ii)/2.) % (np.pi*2) Positions[i] = np.array((radius * np.cos(phis[i]), radius * np.sin(phis[i]), middle[2])) ds[i] = radius * ((phi_ii - phi_i) % (2 * np.pi)) phis = (phis + np.pi/2) % (2 * np.pi) Positions[:, 0] += middle[0] Positions[:, 1] += middle[1] return Positions, phis, ds, grounded, ltype if ltype in ['arbitrary', 'line']: Positions = [] Directions = [] DipoleLengths = [] n = len(Coordinates) for i in range(n - grounded): cii = Coordinates[(i + 1) % n] ci = Coordinates[i] Positions.append((cii + ci) / 2) Directions.append(np.arctan2((cii[1] - ci[1]), (cii[0] - ci[0]))) DipoleLengths.append(np.sqrt((cii[0] - ci[0])**2 + (cii[1] - ci[1])**2)) Positions = np.array(Positions, dtype=np.float64) Directions = np.array(Directions, dtype=np.float64) DipoleLengths = np.array(DipoleLengths, dtype=np.float64) return Positions, Directions, DipoleLengths, grounded, realltype
[docs]def buildSquare(k=1, num_segs=12, P=(0, 0, 0), max_length=None, savename=None, dipole_clockwise=True, turns=1, **kwargs): """ Square loop around P with edge length k. Parameters ---------- k: float [ 1 ] Length of one edge. num_segs: integer [ 12 ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. savename: string [ None ] Basename for the loop. Trigger to save the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. turns: integer [ 1 ] Number of turns of a closed loop. kwargs: dict Keyword arguments are redirected to the loop class Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> from comet import pyhed as ph >>> l = ph.loop.buildSquare(k=3.25, max_length=0.24) >>> print(l) >>> print(l.config) >>> l.show() """ loop = buildRectangle(((-k/2. + P[0], -k/2. + P[1], 0), (k/2. + P[0], k/2. + P[1], 0)), num_segs=num_segs, max_length=max_length, savename=savename, dipole_clockwise=dipole_clockwise, **kwargs) loop.turns = turns loop.area = k**2 return loop
[docs]def buildRectangle(points, num_segs=1, max_length=None, savename=None, dipole_clockwise=None, turns=1, **kwargs): """ Creates a rectangular shaped loop and manages dipole discretization. Creates a rectangular loop out of four given corner points, with a given discretisation between the points. Per default the function returns the position of the dipoles, the angle between its orientation and the x-direction and the dipole Length it represents. Parameters ---------- points: list List or Array containing 2D or 3D coordinates (however z-values ignored for now). In case of a rectangle two or four coords are needed. In case of two coordinates, the rectangle will have edges parallel to the coordinate axes. num_segs: integer [ 1 ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. savename: string [ None ] Basename for the loop. Trigger to save the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. turns: integer [ 1 ] Number of turns of a closed loop. kwargs: dict Keyword arguments are redirected to the loop class. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> from comet import pyhed as ph >>> l = ph.loop.buildRectangle([[-5, -5], [5, 5]], max_length=0.64) >>> print(l) >>> print(l.config) >>> l.show() """ if len(points) == 2: # assume corner points p1 = np.copy(points[0]) p2 = np.copy(points[1]) assert not np.isclose(p1[0] - p2[0], 0.0) and \ not np.isclose(p1[1] - p2[1], 0.0), '"buildRectangle" needs two \ opposing corner points to work. Abort' points = np.ones((4, 3)) points[0] = np.array((p1[0], p1[1], 0.0)) points[1] = np.array((p2[0], p1[1], 0.0)) points[2] = np.array((p2[0], p2[1], 0.0)) points[3] = np.array((p1[0], p2[1], 0.0)) loop = buildLoop(points, num_segs=num_segs, max_length=max_length, savename=savename, grounded=False, ltype='rectangle', dipole_clockwise=dipole_clockwise, **kwargs) loop.turns = turns loop.area = vec.areaFromPolyPoints(points) return loop
[docs]def buildLoop(Points, num_segs=1, max_length=None, savename=None, grounded=False, ltype=None, dipole_clockwise=None, turns=1, **kwargs): """ Creates an arbitrary shaped loop out of given coordinates. The returnes object is an initialized loop class. Most general function to build a loop and called by most of the other specialized functions after input preparation. Parameters ---------- Points: list List or Array containing 2D or 3D coordinates of shape (n, 2 or 3) for n corner point of an arbitrary shaped polygon (z_vlaues will be set to 0 or now). num_segs: integer [ 1 ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. savename: string [ None ] Basename for the loop. Trigger to save the loop. grounded: boolean [ False ] Defines wether the loop is closed or not. Also defines which default field mode will be calculated, as 'tm' field of a closed loop is zero. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. turns: integer [ 1 ] Number of turns of a closed loop. kwargs: dict Keyword arguments are redirected to the loop class. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> points = [[-5, -5], [-10, 5], [2.3, 3.14], [7, -7]] >>> l = ph.loop.buildLoop(points, max_length=0.64) >>> print(l) >>> print(l.config) >>> l.show() """ Points = np.array(Points) n = len(Points) if len(Points[0]) == 2: Points = np.column_stack([Points, np.zeros(n)]) else: Points[:, 2] = 0.0 if max_length is None: peri = vec.perimeterFromPolyPoints(Points, closed=not grounded) max_length = peri / (max(num_segs - 2, 1)) if n == 1: print('build Dipole loop with direction=0 and length=1. To alter these\ values use buildDipole directly instead of buildLoop.') return buildDipole(Points, length=1, angle=0) elif n == 2: return buildLine(Points[0], Points[1], num_segs=num_segs, max_length=max_length, savename=savename, grounded=True) if dipole_clockwise is not None: if not (_checkClockwise(Points) == dipole_clockwise): Points = np.concatenate((Points[np.newaxis, 0], Points[1:][::-1])) Coordinates = [] for i in range(0, n - grounded): cii = Points[(i + 1) % n] ci = Points[i] if max_length is not None: diff = ci - cii num_segs = int((np.sqrt(diff[0]**2 + diff[1]**2 + diff[2]**2)) / max_length) + 1 if i == 0: Coordinates.extend(_3dlinspace(Points[0], Points[1], num_segs + 1)) elif i == n - 1: Coordinates.extend(_3dlinspace(ci, cii, num_segs + 1)[1:-1]) else: Coordinates.extend(_3dlinspace(ci, cii, num_segs + 1)[1:]) if ltype is None: if grounded is True: ltype = 'line' else: ltype = 'arbitrary' loop = Loop( computeLoopPositions(Coordinates, ltype=ltype, grounded=grounded), **kwargs) if not grounded: loop.turns = turns loop.area = vec.areaFromPolyPoints(Coordinates) if savename is not None: loop.save(savename) return loop
[docs]def buildLine(Start, End, num_segs=0, max_length=None, savename=None, grounded=True, **kwargs): """ Parameters ---------- Start: list 2D or 3D coordinate of start of the line (z value will be ignored for now). Start: list 2D or 3D coordinate of end of the line (z value will be ignored for now). num_segs: integer [ 0 ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. savename: string [ None ] Basename for the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. turns: integer [ 1 ] Number of turns of a closed loop. kwargs: dict Keyword arguments are redirected to the loop class. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> l = ph.loop.buildLine([-3, -3], [4, 2], max_length=0.14) >>> print(l) >>> print(l.config) >>> l.show() """ Coordinates = [] Start = np.array(Start) if len(Start) == 2: Start = np.append(Start, 0.0) else: Start[2] = 0.0 End = np.array(End) if len(End) == 2: End = np.append(End, 0.0) else: End[2] = 0.0 if max_length is not None: diff = Start - End num_segs = int((np.sqrt(diff[0]**2 + diff[1]**2 + diff[2]**2)) / max_length) + 1 Coordinates.extend(_3dlinspace(Start, End, num_segs + 1)) outl = Loop(computeLoopPositions(Coordinates, ltype='line', grounded=grounded), **kwargs) if savename is not None: outl.save(savename) return outl
[docs]def buildDipole(Pos, length=1, angle=0, **kwargs): """ Parameters ---------- Pos: list 2D or 3D coordinate of the dipole. length: float [ 1 ] Dipole legth. angle: [ 0 ] Dipole direction positive clockwise from the x-aixs. kwargs: dict Keyword arguments are redirected to the loop class. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> l = ph.loop.buildDipole([-3, -3], length=1.3, angle=45) >>> print(l) >>> print(l.config) >>> l.show() """ if len(Pos) == 2: Pos = np.append(Pos, 0.0) else: Pos = np.array(Pos) Pos[2] = 0.0 return Loop([np.array((Pos,), dtype=float), np.array((angle,), dtype=float), np.array((length,), dtype=float), True, 'dipole'], **kwargs)
[docs]def buildDummy(**kwargs): """ Creates an empty dummy loop class to gain access to certain functionalities. """ return Loop(['Dummy', 'Dummy', 'Dummy', True, 'Dummy'], **kwargs)
[docs]def buildCircle(r, num_segs=None, max_length=None, P=(0, 0, 0), dipole_clockwise=True, savename=None, turns=1, **kwargs): """ this function builds a n-segmented coil in the x-y-plane around the point P = (X, Y, Z), with Radius r. The first point is at the hightest y value with x = X and therefore the point where the dipole is x-directed. Its the point were the field can calculated directly without rotation, but with translation. The rest of the coil is build clockwise. Parameters ---------- r: float Radius of the loop. num_segs: integer [ None ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. P: list or np.ndarray 2D or 3D coordinate of the mid point of the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. savename: string [ None ] Basename for the loop. turns: integer [ 1 ] Number of turns of a closed loop. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> l = ph.loop.buildCircle(10, 11) >>> print(l) >>> print(l.config) >>> l.show() """ if len(P) == 2: P = np.append(P, 0.0) else: P = np.array(P) P[2] = 0.0 if max_length is not None: circumference = r * np.pi * 2 num_segs = int(circumference / max_length) + 1 num_segs = max(num_segs, 8) x = [] y = [] for nr in range(num_segs): tau = 2 * np.pi * nr/num_segs x.append(P[0] + r * np.cos(tau)) y.append(P[1] + r * np.sin(tau)) drop_tol = kwargs.pop('drop_tol', 1e-14) z = np.ones(num_segs) * P[2] z[np.where(abs(z) < drop_tol)] = 0 x = np.asarray(x) x[np.where(abs(x) < drop_tol)] = 0 y = np.asarray(y) y[np.where(abs(y) < drop_tol)] = 0 coords = np.array((x, y, z), np.float64).T loop = Loop(computeLoopPositions(coords, ltype='circle', middle=P, grounded=False), **kwargs) if not dipole_clockwise: loop.phi = (loop.phi + np.pi) % (2 * np.pi) if savename is not None: loop.save(savename) loop.turns = turns loop.area = (r**2) * np.pi return loop
[docs]def buildSpiral(r1, r2, sp_turns=2, sp_segs=36, max_length=None, P=(0, 0, 0), dipole_clockwise=True, savename=None, theta=90.0, **kwargs): """ this function builds a spiral coil with sp_turns number of turns in the x-y-plane around the point P = (X, Y, Z), with inner radius r1 and outer radius r2. The angle theta defines the start of the spiral (theta = 0 -> East, theta = 90 -> North, etc.). The spiral is build clockwise from r1 to r2. Parameters ---------- r1: float Inner radius of the spiral. r2: float Outer radius of the spiral. sp_turns: integer [ None ] Total number of spiral turns. sp_segs: integer [ None ] Total number of segments to be used to discretize the spiral. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. P: list or np.ndarray 2D or 3D coordinate of the mid point of the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction. savename: string [ None ] Basename for the loop. theta: float, deg [ 90.0 ] Orientation of start-end-connection of the spiral in degrees. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> l = ph.loop.buildSpiral(1, 2, sp_turns=5) >>> print(l) >>> print(l.config) >>> l.show() """ # 2D or 3D center point if len(P) == 2: P = np.append(P, 0.0) else: P = np.array(P) P[2] = 0.0 # mean spiral radius R = (r1 + r2) / 2 # vector of spiral angles phi = np.linspace(0, 2 * np.pi * sp_turns, sp_segs * sp_turns + 1) + np.pi / 2 - np.deg2rad(theta) # dr increment dr = (r2 - r1) / phi.size # linearly increasing radius r = np.arange(r1, r2, dr) # corner points of spiral x = [] y = [] for n in range(len(r)): x.append(P[0] + r[n] * np.sin(phi[n])) y.append(P[1] + r[n] * np.cos(phi[n])) # copied form Nico (removing small numbers) drop_tol = kwargs.pop('drop_tol', 1e-14) z = np.ones(len(r)) * P[2] z[np.where(abs(z) < drop_tol)] = 0 x = np.asarray(x) x[np.where(abs(x) < drop_tol)] = 0 y = np.asarray(y) y[np.where(abs(y) < drop_tol)] = 0 coords = np.array((x, y, z), np.float64).T # assemble loop as 'arbitrary' set of points loop = buildLoop(coords, num_segs=1, max_length=max_length, savename=savename, grounded=False, ltype='arbitrary', dipole_clockwise=dipole_clockwise, **kwargs) # there is effectively only 1 turn (check usage!) loop.turns = 1 # average area of the loop (check usage!) loop.area = (R**2) * np.pi return loop
[docs]def buildFig8Circle(r, num_segs=None, max_length=None, P=(0, 0, 0), savename=None, turns=1, **kwargs): """ Build a figure-of-eight loop with circular loops around point P = (X, Y, Z), with Radius r. Parameters ---------- r: float Radius of the loop. num_segs: integer [ None ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. In this case divided between the two circular loops. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. P: array_like [ (0., 0., 0.) ] 2D or coordinate of the mid point of the loop. savename: string [ None ] Basename for the loop. turns: integer [ 1 ] Number of turns of a closed loop. Returns ------- pyhed.loop.Loop: Pyhed loop class instance. """ if num_segs is None: n_segs2 = None else: n_segs2 = int(num_segs/2.) circ1 = buildCircle(r, num_segs=n_segs2, P=(P[0] - r, P[1], P[2]), max_length=max_length, savename=None, dipole_clockwise=True, **kwargs) circ2 = buildCircle(r, num_segs=n_segs2, P=(P[0] + r, P[1], P[2]), max_length=max_length, savename=None, dipole_clockwise=False, **kwargs) fig8 = mergeLoops([circ1, circ2], true_merge=True, take_single_min_dis=True) fig8.turns = turns fig8.area = circ1.area + circ2.area return fig8
[docs]def buildMultiKnotLoop(edgelength, num_segs=None, max_length=None, P=(0, 0, 0), dipole_clockwise=True, savename=None, turns=1, **kwargs): """ Build a figure-of-eight loop with circular loops around point P = (X, Y, Z), with Radius r. Parameters ---------- edgelength: float Edge length of inner part of Multi-Knot Loop. num_segs: integer [ None ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. In this case divided between the two circular loops. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. P: array_like [ (0., 0., 0.) ] 2D or coordinate of the mid point of the loop. dipole_clockwise: [ True ] Define the dipole to be ordered in a clockwise direction (meaning the inner loop). savename: string [ None ] Basename for the loop. turns: integer [ 1 ] Number of turns of a closed loop. Returns ------- pyhed.loop.Loop: Pyhed loop class instance. """ inpos = [[-edgelength, -0.5* edgelength, 0], [-edgelength, -edgelength, 0], [-0.5* edgelength, -edgelength, 0], [-0.5* edgelength, edgelength, 0], [-edgelength, edgelength, 0], [-edgelength, 0.5* edgelength, 0], [edgelength, 0.5* edgelength, 0], [edgelength, edgelength, 0], [0.5* edgelength, edgelength, 0], [0.5* edgelength, -edgelength, 0], [edgelength, -edgelength, 0], [edgelength, -0.5* edgelength, 0]] if not dipole_clockwise: inpos = inpos[::-1] loop = buildLoop(inpos, num_segs=num_segs, max_length=max_length, savename=savename, grounded=False, ltype='arbitrary', **kwargs) loop.turns = turns loop.area = edgelength**2 * 2 return loop
[docs]def buildFig8(points, num_segs=3, max_length=None, mid=0, dipole_clockwise=True, turns=1, **kwargs): """ Builds a figure-of-eight Loop with respect to the given corner Points. This function is part of the pyhed.loops library and returns a 'loop'-class object suitable to calculate electric and magnetic fields based on horizontal electric dipoles. Please always check the loop consistency with the buildin .show() command (see Example) before calculating with experimetal loop layouts. Parameters ---------- num_segs: integer [ 12 ] Total number of segments to be used to discretize the Loop. Used internally to define the max_length of a dipole. Inferior usage compared to max_length. max_length: float [ None ] Defines the minimum length of a dipole used for the discretization of the loop. Superior usage compared to num_segs. points: array_like Points can be of shape (2, 3), two points with three coordinates (x, y, z) and the algorithm will build a loop with edges parallel to the coordiante axes. Although the point coordiantes are given with z values, the current implementation of COMET is not able to allow for any z-values other than zero, i appologize for the inconvenience. This flaw will be adressed as soon as COMET moves towards 2D resistivity structures. max_length: float or None (None) The discretisation between the cornerpoints of the loop can be sampled by any rate (m) you choose. Since the total number of points between the corner points of the loop has to be an integer, the real distance between the points will always be smaller or equal to max_length. Its highly recommended to use at least 10 dipoles between the different points, and therefore a total number of dipoles of >= 50 - 60. This leads to a natural max_length of 1/10 the smaller edge of the figure-of-eight loop. num_segs: integer (3) The number of segments between the corner points of the loop can also be given directly, but mention that the max_length value (not None) will have priority. Usually max_length will lead to more homogeneous distributions of dipoles between the corner and midpoints of a figure-of-eight loop. mid: integer (0) The middle connection depends on the value "mid". The default value (0) sets the middle lines parrallel to the y-axis. Other values are setting the line parrallel to the x axis, respectively. dipole_clockwise: boolean (True) The dipoles are orientated clockwise with respect to the first half of the loop or counterclockwise if this switch is set the False. The first half is considered to be the half connected to the upper left point of the loop boundary, so either the left or the upper loop depending on mid. Possible kwargs are: savename. Please see "buildLoop" for more details. Returns ------- pyhed.loop: Pyhed loop class instance. Example ------- >>> import pyhed as ph # this works if pyhed is in your path. >>> l = ph.loop.buildSquare(k=3.25, max_length=0.14) Example ------- >>> import pyhed as ph >>> p1 = (-1, 1, 0) >>> p2 = (1, -1, 0) >>> fig8 = ph.loop.buildfig8((p1, p2), max_length=0.1) >>> print(fig8) >>> print(fig8.config) >>> fig8.show() """ if len(points) == 2: # assume corner points # np.copy to prevent: TypeError: 'tuple' object does not support item # assignment # TODO! allow for z-values different than zero # for the time being the resulting positions can be shifted manually p1 = np.copy(points[0]) if len(p1) == 2: p1 = np.append(p1, 0) else: p1[2] = 0. p2 = np.copy(points[1]) if len(p2) == 2: p2 = np.append(p2, 0) else: p2[2] = 0. area = vec.areaFromPolyPoints(points) assert not np.any(np.isclose(p1[0:2], p2[0:2])), '"buildFig8" needs\ two opposing corner points to work. Abort.' if mid == 0: cw = bool(p1[1] > p2[1]) half = float(p1[0] + p2[0])/2. points = np.array((p1, (half, p1[1], 0.), (half, p2[1], 0.), p2, (p2[0], p1[1], 0.), (half, p1[1], 0.), (half, p2[1], 0.), (p1[0], p2[1], 0.))) else: cw = bool(p1[0] < p2[0]) half = float(p1[1] + p2[1])/2. points = np.array((p1, (p2[0], p1[1], 0.), (p2[0], half, 0.), (p1[0], half, 0.), (p1[0], p2[1], 0.), p2, (p2[0], half, 0.), (p1[0], half, 0.))) if cw is not dipole_clockwise: # change the direction of the dipoles with # respect to "dipole_clockwise" points = np.concatenate((points[np.newaxis, 0], points[1:][::-1])) loop = buildLoop(points, num_segs=num_segs, max_length=max_length, grounded=False, ltype='figure8', **kwargs) loop.turns = turns loop.area = area return loop
[docs]def buildEtraSourceDiscretization(poly, edgelength, max_length=0.251, x_left=None, n_segs=None, number_of_loops=8): """ Internal function. Implements an etra shaped source in the FEM mesh. Cannot use buildEdgeSourceDiscretization due to overlapping edges. """ assert number_of_loops > 1, 'number of loops <= 1' offsets = number_of_loops + 2 norm_off = (number_of_loops + 1)/2. offset = np.linspace(-norm_off/2 * edgelength, norm_off/2 * edgelength, offsets) if x_left is not None: offset -= (offset[0] - x_left) lines = [] if n_segs is None: n_segs = int(edgelength // max_length) n_segs2 = int((edgelength / 2) // max_length) else: n_segs2 = int(n_segs / 2) y_pos = np.linspace(-edgelength * 0.5, edgelength * 0.5, n_segs) for xi, x_off in enumerate(offset): # y lines x_pos = np.ones_like(y_pos) * x_off pos = np.column_stack([x_pos, y_pos, np.zeros_like(x_pos)]) lines.append(pos) if xi > 0: # x lines x_pos2 = np.linspace(offset[xi - 1], offset[xi], n_segs2) y_pos_up = np.ones_like(x_pos2) * 0.5 * edgelength y_pos_down = np.ones_like(x_pos2) * (-edgelength * 0.5) lines.append(np.column_stack([x_pos2, y_pos_up, np.zeros_like(x_pos2)])) lines.append(np.column_stack([x_pos2, y_pos_down, np.zeros_like(x_pos2)])) if isinstance(poly, pg.Mesh): # case 1/2: no custEM dim_poly = poly.dim() if dim_poly == 2: for j in range(len(lines)): nn = [poly.createNodeWithCheck([o[0], o[1]]) for o in lines[j]] for k in range(len(lines[j]) - 1): poly.createEdge(nn[k], nn[k + 1], marker=47) else: for j in range(len(lines)): nn = [poly.createNodeWithCheck([o[0], o[1], 0.0]) for o in lines[j]] for k in range(len(lines[j]) - 1): poly.createEdge(nn[k], nn[k + 1], marker=47) else: # case 2/2: no custEM poly.build_surface(insert_lines=[*lines], line_marker=[47] * len(lines))
[docs]def buildEdgeSourceDiscretization(surface, pos, phi, ds, closed=True): """ Internal function. Used to implement every dipole in the FEM mesh using an appropriate edge that represents it. """ pos = np.atleast_1d(pos) ds = np.atleast_1d(ds) phi = np.atleast_1d(phi) pos1 = pos + np.array([np.cos(phi)*ds/2, np.sin(phi)*ds/2, np.zeros(len(phi))]).T if not closed: # first point is different from last point # add first point explicitly pos_start = pos[0] - np.array([np.sin(phi[0])*ds[0]/2, np.cos(phi[0])*ds[0]/2, 0.0]).T pos1 = np.append(pos_start, pos1) nodes = [] for pi, posi in enumerate(pos1): nodes.append(surface.createNodeWithCheck(posi)) for ni in range(len(nodes) - 1): surface.createEdge(nodes[ni], nodes[ni + 1], marker=47) if closed: surface.createEdge(nodes[-1], nodes[0], marker=47)
[docs]def buildPointSourceDiscretization(surface, pos): """ Internal function. Used to implement each source dipole as simple node in the FEM mesh. """ pos = np.atleast_1d(pos) for pi in pos: surface.createNodeWithCheck(pi)
[docs]def buildEtraSurvey(edgelength, return_measurements=False, origin=[0, 0], num_loops=8, max_length=None, savenames=None, **kwargs): """ Special etra survey for NMR applications. """ if num_loops <= 2: raise Exception('buildEtraSurvey: num_loops has to be > 2.') if savenames is not None: if not isinstance(savenames, str): raise Exception('"savenames" has to be string not {}.' .format(type(savenames))) if '{}' not in savenames: savenames = savenames + '_{}' loops = [] corner_x = np.linspace(-0.25 * num_loops * edgelength, 0.25 * num_loops * edgelength, num_loops + 1) c_idx_x = [] c_idx_x.append([1, num_loops - 1]) for i in range(num_loops - 1): c_idx_x.append([i, i + 2]) if max_length is None: max_length = edgelength/25. for (ind_1, ind_2) in c_idx_x: loops.append(buildRectangle( [[corner_x[ind_1] + origin[0], -edgelength/2 + origin[1], 0], [corner_x[ind_2] + origin[0], edgelength/2 + origin[1], 0]], max_length=max_length, dipole_clockwise=kwargs.pop('dipole_clockwise', True), **kwargs)) for lp, loop in enumerate(loops): if savenames: loop.name = savenames.format(lp) else: loop.name = f'loop_{lp}' if return_measurements: ret = (loops, np.array([np.zeros(num_loops, dtype=int), np.arange(0, num_loops, dtype=int), np.zeros(num_loops, dtype=int)]).T,) else: ret = loops return ret
[docs]def loadLoop(name, **kwargs): """ Imports loop from file archive. See *ph.loop.Loop.load* for details. """ ll = buildDummy() ll.load(name, **kwargs) return ll
[docs]def loadLoops(name, num=8, load_meshes=False, cfg_name=None, cfg2_name=None, overwrite_dir=False): """ Loads n loops with name = ...{n}... """ loops = [] kwargs = {} kwargs['overwrite_dir'] = overwrite_dir kwargs['load_meshes'] = load_meshes for i in range(num): if cfg_name: kwargs['config'] = cfg_name.format(i) if cfg2_name: kwargs['config2'] = cfg2_name.format(i) loops.append(loadLoop(name.format(i), **kwargs)) return loops
[docs]def dipolePosFromSimpleLoop(r, n, P=(0, 0, 0), drop_tol=1e-14): """ Convienience function to have fast access to circular loop coordinates. """ Loop = buildCircle(r, n, P=P, drop_tol=drop_tol) posi = Loop.pos dipole_pos = np.array((posi[0], posi[1], posi[2])) return dipole_pos.T
[docs]def mergeLoops(*loops, true_merge=False, config=None): """ Merges given loops to one and optionally merges equal dipoles. Parameters ---------- loops: loops-classes Loops to be merged. true_merge: boolean [True] Switch for a complete merge of all dipoles to with respect to their phi and dipole length. Attention this can be problematic when merging edge-to-edge loops (for mesh creation for example). If False only the dipole positions are merged not phi/ds (there replaced with dummy values). config: ph.config-instance or string [None] Sets config of merged loop either per index (cofig is taken from the corresponding loop) or give a new config. If None the config of the first loop is used instead. """ if isinstance(loops[0], (list, tuple, np.ndarray)): loops = loops[0] pos = np.concatenate(([loop.pos for loop in loops]), axis=0) if config is None: config = loops[0].config elif isinstance(config, int): config = loops[config].config if true_merge: # case 1/2: fast merge of all dipoles, phi, and ds. phi = np.concatenate(([loop.phi for loop in loops]), axis=0) ds = np.concatenate(([loop.ds for loop in loops]), axis=0) merged = Loop((pos, phi, ds, False, 'arbitrary')) else: # case 2/2: fast merge of all dipoles, not phi/ds. # use this for mesh creation pos = pg.utils.unique_rows(pos) merged = Loop((pos, np.ones(len(pos)), # fake phi np.ones(len(pos)), # fake ds False, # grounded = False 'arbitrary' )) # take min distance from single loops ds = np.concatenate(([loop.ds for loop in loops])) merged._mindistance = np.min(ds) return merged
[docs]def createSeparatedLoopMesh(*loops, dipole_mesh=False, **kwargs): """ Build a mesh suited for EM primary field calculation if using more than one loop. """ # gather all dipoles for discretization of mesh if isinstance(loops[0], (list, tuple, np.ndarray)): loops = loops[0] pos = np.concatenate(([loop.pos for loop in loops]), axis=0) pos = pg.utils.unique_rows(pos) # dummy loop hlp = Loop((pos, np.ones(len(pos)), # fake phi np.ones(len(pos)), # fake ds False, # grounded = False 'arbitrary' )) hlp.setMeshParameters(refinement_para=kwargs.pop('refinement_para', 1.0), max_area_factor=kwargs.pop('max_area_factor', 1.0), tetgen_quality=kwargs.pop('tetgen_quality', 1.2)) loop_mesh = hlp.createLoopMesh() if dipole_mesh: dipole_mesh = hlp.createDipoleMesh() return loop_mesh, dipole_mesh else: return loop_mesh
[docs]def createSeparatedFEMMesh(*loops, para_mesh_2d=None, **kwargs): """ Build a mesh suited for EM secondary field calculation if more than one loop is used. """ # gather all dipoles for discretization of mesh hlp = mergeLoops(*loops) hlp.setMeshParameters(refinement_para=kwargs.pop('refinement_para', 1.0), max_area_factor=kwargs.pop('max_area_factor', 1.0), tetgen_quality=kwargs.pop('tetgen_quality', 1.2)) loop_mesh = hlp.createFEMMesh(para_mesh_2d=para_mesh_2d, **kwargs) return loop_mesh
[docs]def createEtraMesh(loops, mesh2d, anomaly, savename=None, extend_x=0., extend_y=-0.3, extend_z=-0.5, max_volume=25., append_boundary=True, sort=True, return_loop=False): """ Creates a finite element mesh suited for ETRA surveys. """ mesh2d = pg.Mesh(mesh2d) mesh_loop = mergeLoops(loops, true_merge=False) mesh_loop.setParaMesh2D(mesh2d, append_boundary=append_boundary, anomaly=anomaly, sort=sort) mesh_loop.createFEMMesh(savename=savename, extend_x=extend_x, extend_y=extend_y, extend_z=extend_z, maxVolume=max_volume, source_setup='etra') if return_loop: return mesh_loop else: return mesh_loop.loopmesh, mesh_loop.valid_marker
[docs]def buildEtraPoly(x_min, x_max, small, marker=0): assert int(x_max - x_min) % small == 0, 'No Etra fits in there.' poly = pg.Mesh(2) n = 0 for x in np.arange(x_min, x_max + small/2, small/2): for yi, y in enumerate([-small/2, small/2]): poly.createNodeWithCheck([x, y]) if yi == 1: # y directed bounds poly.createBoundary([n-1, n], marker=marker) if n >= 3: # x directed bounds, upper poly.createBoundary([n-2, n], marker=marker) # -"- lower poly.createBoundary([n-1, n-3], marker=marker) n += 1 return poly
[docs]def createMultipleLoopMesh( loops, savename=None, source_setup='etra', triangle_quality=33.8, source_max_area=None, inner_area_volume=None, mid_area_volume=None, outer_area_volume=None, minx=None, maxx=None, miny=None, maxy=None, minz=None, air_refinement=False, source_poly=None): """ Build a suited mesh for magnetic field calculation for NMR purpose. Sources are included in a way defined by **source_setup**. Parameters ---------- loops: ph.loop.loop or array_like Input loops for which the mesh shall be created. savename: string [ None ] Savename for mesh (.bms will be added). source_setup: string [ 'etra' ] In case of multiple loops, the source_setup is important to define how the loops are included in the mesh. Default is 'etra' for NMR Etra setups. Alternatively 'edges' can be used to implement each dipole as edge with the dipole as midpoint, length as well as direction is defined by the dipole. This is not working for overlaping loops (e.g. Etras). For those a source_poly can be provided or 'nodes' is chosen to simply implement each dipole as node in the mesh. triangle_quality: float [ 34.0 ] The surface where the sources are implemented is meshed in 2D an then later inserted in the 3D mesh. This controls the triangle quality for this surface mesh. source_max_area: maximum area allowed in the 2D surface mesh (sources and 5 meter around the sources). Automatically defined if None (based on size of the area to ensure a minimum amount of trinagle cells). 2D surface mesh is exported if logger is set to debug level (10). inner_area_volume: float [ None ] The inner refinement volume is defined 5 meter around and below the sources. The maximum cell volume can be defined here. Automatically defined if None (based on size of the volume to ensure a minimum amount of cells). mid_area_volume: float [ None ] The median refinement volume is defined through minx, maxx, miny, maxy, and minz meter around and below the sources. The maximum cell volume can be defined here. Automatically defined if None (based on size of the volume to ensure a minimum amount of cells). outer_area_volume: float [ None ] The outer sides of the mesh (3 times miny, maxx, miny, maxy, minz) is to ensure interpolation of the field values are secured without the need for extrapolation. Usually the max cell volume is not contraint to minimize the computational effort. minx: float [ None ] Minimum x extention (in addition to the extend of the inner refinement area) of median refinement volume. If None this value is defined as maximum distance of two dipoles of the input loops. maxx: float [ None ] Maximum x extention (in addition to the extend of the inner refinement area) of median refinement volume. If None this value is defined as maximum distance of two dipoles of the input loops. miny: float [ None ] Minimum y extention (in addition to the extend of the inner refinement area) of median refinement volume. If None this value is defined as maximum distance of two dipoles of the input loops. maxy: float [ None ] Maximum y extention (in addition to the extend of the inner refinement area) of median refinement volume. If None this value is defined as maximum distance of two dipoles of the input loops. minz: float [ None ] Maximum z extention (in addition to the extend of the inner refinement area) of median refinement volume. If None this value is defined as maximum distance of two dipoles of the input loops. 1/3 of the value is used for maxz if air refinement is enabled. air_refinement: boolean [ False ] If true the airspace is meshed as well. source_poly: pg.Mesh or plc [None] For unusual or overlapping, non etra, sources a piecewise linear complex (plc) can be created using the pygimli mesh- and polytools. This is then used as source definition for the 2D source layer mesh. Any region markers with area constraints will be considered, no additional markers will be set. """ # % input: a bunch of loops # Step 1: merge and define some input stuff loops = np.atleast_1d(loops) if len(loops) > 1: merged = mergeLoops(loops, true_merge=False) else: merged = loops[0] # create proper file path for saving stuff merged.setLoopMeshName(savename) mesh_path = Path(merged.loop_mesh_name) mesh_path.parent.mkdir(exist_ok=True, parents=True) log.debug([mesh_path.as_posix(), mesh_path.parent.as_posix()]) # without suffix mesh_base = mesh_path.with_name(mesh_path.stem).as_posix() log.info('creating Loop mesh...') # % Step 2: limits # compute outer limits for FEM mesh if minx is None: minx = np.min(merged.pos.T[0]) - merged._maxdistance if maxx is None: maxx = np.max(merged.pos.T[0]) + merged._maxdistance if miny is None: miny = np.min(merged.pos.T[1]) - merged._maxdistance if maxy is None: maxy = np.max(merged.pos.T[1]) + merged._maxdistance if minz is None: minz = np.min(merged.pos.T[2]) - merged._maxdistance if air_refinement: maxz = 0.33 * abs(minz) else: maxz = 0.0 if mid_area_volume is None: # want to have at least roughly 35000 cells in mid area: mid_area_volume = (maxx - minx) * (maxy - miny) * (maxz - minz) / 35000 log.log(13, 'inner mesh x:, {}, {}'.format(minx, maxx)) log.log(13, 'inner mesh y:, {}, {}'.format(miny, maxy)) log.log(13, 'inner mesh z:, {}, {}'.format(minz, maxz)) # % Step 4: implement sources if source_poly is None: source_poly = pg.Mesh(2) # case 1/3: etra loops are special as they have overlapping edges # which are not easy to implement if source_setup.lower() == 'etra': number_of_loops = len(loops) # number of segments a single edge is divided in ay = np.max(merged.pos[:, 1]) - np.min(merged.pos[:, 1]) # overrides poly, cannot do this another way buildEtraSourceDiscretization( source_poly, ay, max_length=merged._mindistance, number_of_loops=number_of_loops) # case 2/3: source is inserted as edges, the dipoles are in the middle # of a edge with length if the dipole. Do not use for very corse dipole # discretizations. elif source_setup.lower() == 'edges': buildEdgeSourceDiscretization( source_poly, merged.pos, merged.phi, merged.ds, closed=not merged.grounded) # case 3/3: dipoles are added as dipoles. Just as nodes, thats it. elif source_setup.lower() == 'nodes': buildPointSourceDiscretization(source_poly, merged.pos) # no valid case: raise Exception else: raise Exception('Source setup "{}", not in ["etra", "nodes", ' '"edges"].'.format(source_setup)) else: # source poly is not None if isinstance(source_poly, str): source_poly = pg.load(source_poly) # adding outer boundary # 5 * 5 * 5 meter case around sources ibox_x = (source_poly.xmin() - 5, source_poly.xmax() + 5) ibox_y = (source_poly.ymin() - 5, source_poly.ymax() + 5) if inner_area_volume is None: # if not specified, try to get at least 15,000 cells in inner area inner_area_volume = (ibox_x[0] - ibox_x[1]) *\ (ibox_y[0] - ibox_y[1]) *\ 5 / 15000 bbox = pg.Mesh(2) bb0 = bbox.createNode([ibox_x[0], ibox_y[0]]) bb1 = bbox.createNode([ibox_x[0], ibox_y[1]]) bb2 = bbox.createNode([ibox_x[1], ibox_y[1]]) bb3 = bbox.createNode([ibox_x[1], ibox_y[0]]) # bbox.createQuadrangleFace(bb0, bb1, bb2, bb3) bbox.createEdge(bb0, bb1) bbox.createEdge(bb1, bb2) bbox.createEdge(bb2, bb3) bbox.createEdge(bb3, bb0) inner = pg.meshtools.polytools.mergePLC([bbox, source_poly]) if source_max_area is None: source_max_area = (merged._mindistance ** 2) * 4 log.info('Building 2d surface mesh with quality: {:.2f}, max Area: ' '{:.2f} m².'.format(triangle_quality, source_max_area)) interface_mesh2d = pg.meshtools.createMesh( inner, quality=triangle_quality, area=source_max_area) log.info('2d surface mesh: {}'.format(interface_mesh2d)) if log.getEffectiveLevel() <= 10: interface_mesh2d.exportVTK('interface_mesh_2d.vtk') # now: cells in 2d are boundaiers in 3D, therefore we need to do this merge # manually interface_mesh = pg.Mesh(3) for cell in interface_mesh2d.cells(): cids = [] for node in cell.nodes(): cids.append(interface_mesh.createNodeWithCheck(node.pos())) interface_mesh.createTriangleFace(*cids) pos_arr = interface_mesh.positions().array() b_left = np.isclose(pos_arr[:, 0], interface_mesh.xmin()) b_right = np.isclose(pos_arr[:, 0], interface_mesh.xmax()) b_upper = np.isclose(pos_arr[:, 1], interface_mesh.ymax()) b_lower = np.isclose(pos_arr[:, 1], interface_mesh.ymin()) interface_mesh.setDimension(3) # task: identify nodes at the boundary of the inner mesh to connect them # to the outer mesh # y positions of nodes at the left boundary of the inner mesh y_left = np.argsort(pos_arr[b_left][:, 1]) midid_yleft = int(len(y_left)//2) id_left = np.where(b_left)[0][y_left] # y positions of nodes at the right boundary of the inner mesh y_right = np.argsort(pos_arr[b_right][:, 1]) midid_yright = int(len(y_right)//2) id_right = np.where(b_right)[0][y_right] # x positions of nodes at the upper boundary of the inner mesh x_upper = np.argsort(pos_arr[b_upper][:, 0]) midid_xupper = int(len(x_upper)//2) id_upper = np.where(b_upper)[0][x_upper] # x positions of nodes at the lower boundary of the inner mesh x_lower = np.argsort(pos_arr[b_lower][:, 0]) midid_xlower = int(len(x_lower)//2) id_lower = np.where(b_lower)[0][x_lower] # sides, now we close the sides under the inner mesh connecting the nodes # from the boundary with the corners of the lower half of the innner mesh n4 = interface_mesh.createNodeWithCheck([ibox_x[0], ibox_y[0], -5.0]) n5 = interface_mesh.createNodeWithCheck([ibox_x[0], ibox_y[1], -5.0]) n6 = interface_mesh.createNodeWithCheck([ibox_x[1], ibox_y[1], -5.0]) n7 = interface_mesh.createNodeWithCheck([ibox_x[1], ibox_y[0], -5.0]) im_nodes = interface_mesh.nodes() # min_x, min_y: connect to b_left, b_lower for n in range(len(id_left[:midid_yleft])): node_id0 = id_left[n] node_id1 = id_left[n + 1] interface_mesh.createTriangleFace( n4, im_nodes[node_id0], im_nodes[node_id1]) for n in range(len(id_lower[:midid_xlower])): node_id0 = id_lower[n] node_id1 = id_lower[n + 1] interface_mesh.createTriangleFace( n4, im_nodes[node_id0], im_nodes[node_id1]) # min_x, max_y: connect to b_left, b_upper for n in range(len(id_left[midid_yleft:-1])): node_id0 = id_left[n + midid_yleft] node_id1 = id_left[n + 1 + midid_yleft] interface_mesh.createTriangleFace( n5, im_nodes[node_id0], im_nodes[node_id1]) for n in range(len(id_upper[:midid_xupper])): node_id0 = id_upper[n] node_id1 = id_upper[n + 1] interface_mesh.createTriangleFace( n5, im_nodes[node_id0], im_nodes[node_id1]) # max_x, max_y: connect to b_right, b_upper for n in range(len(id_right[midid_yright:-1])): node_id0 = id_right[n + midid_yright] node_id1 = id_right[n + 1 + midid_yright] interface_mesh.createTriangleFace( n6, im_nodes[node_id0], im_nodes[node_id1]) for n in range(len(id_upper[midid_xupper:-1])): node_id0 = id_upper[n + midid_xupper] node_id1 = id_upper[n + 1 + midid_xupper] interface_mesh.createTriangleFace( n6, im_nodes[node_id0], im_nodes[node_id1]) # max_x, min_y: connect to b_right, b_lower for n in range(len(id_right[:midid_yright])): node_id0 = id_right[n] node_id1 = id_right[n + 1] interface_mesh.createTriangleFace( n7, im_nodes[node_id0], im_nodes[node_id1]) for n in range(len(id_lower[midid_xlower:-1])): node_id0 = id_lower[n + midid_xlower] node_id1 = id_lower[n + 1 + midid_xlower] interface_mesh.createTriangleFace( n7, im_nodes[node_id0], im_nodes[node_id1]) interface_mesh.createTriangleFace(n4, n5, im_nodes[id_left[midid_yleft]]) interface_mesh.createTriangleFace(n5, n6, im_nodes[id_upper[midid_xupper]]) interface_mesh.createTriangleFace(n6, n7, im_nodes[id_right[midid_yright]]) interface_mesh.createTriangleFace(n7, n4, im_nodes[id_lower[midid_xlower]]) log.log(16, 'Add volume constraint marker (inner box): {} using {:.3f} m³' .format([np.mean(ibox_x), np.mean(ibox_y), -2.5], inner_area_volume)) interface_mesh.addRegionMarker([np.mean(ibox_x), np.mean(ibox_y), -2.5], 1, area=inner_area_volume) # bottom interface_mesh.createQuadrangleFace(n4, n5, n6, n7) # for bound in interface_mesh.boundaries(): # bound.setMarker(1) # interface_mesh.exportBoundaryVTU('interface_mesh.vtu') # interface_mesh.exportVTK('interface_mesh.vtk') mid_poly = createPolyBoxWithHalfspace( minx, maxx, miny, maxy, minz, maxz, halfspace_at=0.0, without_halfspace=True, interface_marker=None) for bound in mid_poly.boundaries(): ids = [] for node in bound.nodes(): ids.append(interface_mesh.createNodeWithCheck(node.pos())) interface_mesh.createQuadrangleFace(*ids) # for bound in interface_mesh.boundaries(): # bound.setMarker(1) # interface_mesh.exportBoundaryVTU('mid_poly.vtu') # interface_mesh.exportVTK('mid_poly.vtk') nodes_mid_poly = mid_poly.nodes()[:4] nodes_mid = [] for node in nodes_mid_poly: nodes_mid.append(interface_mesh.createNodeWithCheck(node.pos())) # sides # print(nodes_mid[0].pos()) # minx, miny 0 # print(nodes_mid[1].pos()) # maxx, miny 3 # print(nodes_mid[2].pos()) # maxx, maxy 2 # print(nodes_mid[3].pos()) # minx, maxy 1 mid_nodes = interface_mesh.nodes() # min_x, min_y: connect to b_left, b_lower for n in range(len(id_left[:midid_yleft])): node_id0 = id_left[n] node_id1 = id_left[n + 1] interface_mesh.createTriangleFace( nodes_mid[0], mid_nodes[node_id0], mid_nodes[node_id1]) for n in range(len(id_lower[:midid_xlower])): node_id0 = id_lower[n] node_id1 = id_lower[n + 1] interface_mesh.createTriangleFace( nodes_mid[0], mid_nodes[node_id0], mid_nodes[node_id1]) # min_x, max_y: connect to b_left, b_upper for n in range(len(id_left[midid_yleft:-1])): node_id0 = id_left[n + midid_yleft] node_id1 = id_left[n + 1 + midid_yleft] interface_mesh.createTriangleFace( nodes_mid[3], mid_nodes[node_id0], mid_nodes[node_id1]) for n in range(len(id_upper[:midid_xupper])): node_id0 = id_upper[n] node_id1 = id_upper[n + 1] interface_mesh.createTriangleFace( nodes_mid[3], mid_nodes[node_id0], mid_nodes[node_id1]) # max_x, max_y: connect to b_right, b_upper for n in range(len(id_right[midid_yright:-1])): node_id0 = id_right[n + midid_yright] node_id1 = id_right[n + 1 + midid_yright] interface_mesh.createTriangleFace( nodes_mid[2], mid_nodes[node_id0], mid_nodes[node_id1]) for n in range(len(id_upper[midid_xupper:-1])): node_id0 = id_upper[n + midid_xupper] node_id1 = id_upper[n + 1 + midid_xupper] interface_mesh.createTriangleFace( nodes_mid[2], mid_nodes[node_id0], mid_nodes[node_id1]) # max_x, min_y: connect to b_right, b_lower for n in range(len(id_right[:midid_yright])): node_id0 = id_right[n] node_id1 = id_right[n + 1] interface_mesh.createTriangleFace( nodes_mid[1], mid_nodes[node_id0], mid_nodes[node_id1]) for n in range(len(id_lower[midid_xlower:-1])): node_id0 = id_lower[n + midid_xlower] node_id1 = id_lower[n + 1 + midid_xlower] interface_mesh.createTriangleFace( nodes_mid[1], mid_nodes[node_id0], mid_nodes[node_id1]) # add missing triangles connecting the lower corners with the middle node # of the surface mesh. interface_mesh.createTriangleFace(nodes_mid[0], nodes_mid[3], im_nodes[id_left[midid_yleft]]) interface_mesh.createTriangleFace(nodes_mid[3], nodes_mid[2], im_nodes[id_upper[midid_xupper]]) interface_mesh.createTriangleFace(nodes_mid[2], nodes_mid[1], im_nodes[id_right[midid_yright]]) interface_mesh.createTriangleFace(nodes_mid[1], nodes_mid[0], im_nodes[id_lower[midid_xlower]]) if mid_area_volume: log.log(16, 'Add volume constraint marker (middle box, ground): {} ' 'using {:.3f} m³' .format([np.mean(ibox_x), np.mean(ibox_y), -5.5], mid_area_volume)) interface_mesh.addRegionMarker([np.mean(ibox_x), np.mean(ibox_y), -5.5], 1, area=mid_area_volume) if air_refinement: log.log(16, 'Add volume constraint marker (middle box, air): {} ' 'using {:.3f} m³' .format([np.mean(ibox_x), np.mean(ibox_y), 5.5], mid_area_volume * 2)) interface_mesh.addRegionMarker([np.mean(ibox_x), np.mean(ibox_y), 5.5], 1, area=mid_area_volume * 2) # for bound in interface_mesh.boundaries(): # bound.setMarker(1) # interface_mesh.exportBoundaryVTU('mid.vtu') # interface_mesh.exportVTK('mid.vtk') # outer boundary to make sure interpolation is possible extrapolation # if mesh is rotated/shifted minx_outer = np.min(merged.pos.T[0]) - 3 * merged._maxdistance maxx_outer = np.max(merged.pos.T[0]) + 3 * merged._maxdistance miny_outer = np.min(merged.pos.T[1]) - 3 * merged._maxdistance maxy_outer = np.max(merged.pos.T[1]) + 3 * merged._maxdistance minz_outer = np.min(merged.pos.T[2]) - 3 * merged._maxdistance outer_poly = createPolyBoxWithHalfspace( min(3 * minx, minx_outer), max(3 * maxx, maxx_outer), min(3 * miny, miny_outer), max(3 * maxy, maxy_outer), min(3 * minz, minz_outer), 3 * maxz, halfspace_at=0.0, without_halfspace=True, interface_marker=None) nodes_outer_poly = outer_poly.nodes()[:4] nodes_outer = [] nodes_mid_outer = [] # merge outer poly in 3d mesh, boundariy per boundary for bound in outer_poly.boundaries(): ids = [] for node in bound.nodes(): ids.append(interface_mesh.createNodeWithCheck(node.pos())) interface_mesh.createQuadrangleFace(*ids) # get 4 corner nodes in the earth/air inteface to connect them for node in nodes_outer_poly: nodes_outer.append(interface_mesh.createNodeWithCheck(node.pos())) # gather 4 corner nodes of the midle mesh for node in nodes_mid: nodes_mid_outer.append(interface_mesh.createNodeWithCheck(node.pos())) # finally connect middle mesh with outer mesh in the x-y-plane at z=0 interface_mesh.createQuadrangleFace(nodes_outer[0], nodes_outer[1], nodes_mid_outer[1], nodes_mid_outer[0]) interface_mesh.createQuadrangleFace(nodes_outer[1], nodes_outer[2], nodes_mid_outer[2], nodes_mid_outer[1]) interface_mesh.createQuadrangleFace(nodes_outer[2], nodes_outer[3], nodes_mid_outer[3], nodes_mid_outer[2]) interface_mesh.createQuadrangleFace(nodes_outer[3], nodes_outer[0], nodes_mid_outer[0], nodes_mid_outer[3]) # add volume marker if outer_area_volume is not None: log.log(16, 'Add volume constraint marker (outer box, ground): {} ' 'using {:.3f} m³' .format([np.mean(ibox_x), np.mean(ibox_y), 2 * minz], outer_area_volume)) interface_mesh.addRegionMarker([np.mean(ibox_x), np.mean(ibox_y), 2 * minz], 3, area=outer_area_volume) # add volume marker for air if air_refinement and outer_area_volume is not None: log.log(16, 'Add volume constraint marker (outer box, air): {} ' 'using {:.3f} m³' .format([np.mean(ibox_x), np.mean(ibox_y), 2./3 * minz], outer_area_volume)) interface_mesh.addRegionMarker([np.mean(ibox_x), np.mean(ibox_y), 2./3 * minz], 3, area=outer_area_volume) # for bound in interface_mesh.boundaries(): # bound.setMarker(1) # interface_mesh.exportBoundaryVTU('outer.vtu') # interface_mesh.exportVTK('outer.vtk') # % meshing, export plc first plcname = mesh_path.with_suffix('.poly') pg.meshtools.exportPLC(interface_mesh, plcname.as_posix()) interface_mesh.exportVTK( mesh_path.with_name('{}.vtk'.format(mesh_path.stem)).as_posix()) # tetgen call tetgen151(mesh_base, quality=merged.tetgen_quality) # import mesh log.info(f'load "{plcname}.1"...') try: loopmesh = pg.Mesh(plcname.with_suffix('.1.vtk').as_posix()) except RuntimeError: # c++ for file not found in this case loopmesh = pg.meshtools.readTetgen( plcname.with_suffix('.1').as_posix()) # save mesh in pygimli syntax == .bms file log.info(loopmesh) loopmesh.save(mesh_path.as_posix()) # for kernel call (piping) log.info('loopmesh.save: {}'.format(mesh_path.as_posix())) # clean up residual files cleanUpTetgenFiles(mesh_path.as_posix()) return loopmesh
[docs]def copyPrimaryFields(rdir1, rdir2): """ For recalculation purpose it sometimes is uneccessary to calc the primary fields again. This function copies the primery fields from on custEM result dir (rdir1) to another (rdir2). names of the fields are not changed. Only the .h5 files are copied. """ import shutil # copy previously calculated primary fields pf_path = Path(f'{rdir1}/primary_fields') pf_path2 = Path(f'{rdir2}/primary_fields') log.info('copyPrimaryFields: Copy primary fields from "{}" to "{}"' .format(rdir1, rdir2)) found = 0 for ni, file in enumerate(pf_path.glob('*/*.h5')): found += 1 old_file = file new_file = pf_path2.joinpath(file.parents[0].name, file.name) new_file.parent.mkdir(exist_ok=True, parents=True) if new_file.exists(): log.debug('found: {}'.format(new_file.as_posix())) pass else: log.debug('write: {}'.format(new_file.as_posix())) shutil.copy(old_file.as_posix(), new_file.as_posix()) if found == 0: raise Exception('No primary fields found in rdir1 : "{}"' .format(rdir1))
[docs]def totalFieldCalculation(custem_config, num_cpu=16): success = local_apps_bash('totalFieldCalculation', custem_config, str(num_cpu)) log.info('success: {}'.format(success)) if success != 0: log.error('totalFieldCalculation aborted.') raise SystemExit()
[docs]def calcWithEmpymod(loop, use_bipole=False): from empymod import bipole import scipy loglvl = log.getEffectiveLevel() // 10 verb = max((min(4 - loglvl, 4)), 0) kwargs = {'verb': verb} str_factor = 1 if loop.config.ftype.lower() == 'h': kwargs['mrec'] = True str_factor *= -1 # definition elif loop.config.ftype.lower() == 'b': kwargs['mrec'] = True str_factor *= -1 # definition str_factor *= np.pi * 4e-7 elif loop.config.ftype.lower() == 'e': kwargs['mrec'] = False if use_bipole: if loop.type != 'line': raise Exception( 'use_bipole=True is only working for line sources.') kwargs['srcpts'] = len(loop.ds) outpos = loop.loopmesh.positions().array() field = np.zeros([3, len(outpos)], dtype=complex) pos_flipped = np.copy(outpos) pos_flipped[:, 2] *= -1 if not use_bipole: z_value = -loop.pos[0, 2] if np.isclose(z_value, 0): z_value += 0.01 src = [loop.pos[:, 0], loop.pos[:, 1], z_value, np.rad2deg(loop.phi), np.zeros(len(loop.pos))] else: direction = loop.pos[-1] - loop.pos[0] direction /= scipy.linalg.norm(direction) start = loop.pos[0] - (loop.ds[0]/2 * direction) end = loop.pos[-1] + (loop.ds[-1]/2 * direction) z_start = start[2] z_end = end[2] if np.isclose(z_start, 0): z_start += 0.01 if np.isclose(z_end, 0): z_end += 0.01 src = [start[0], end[0], start[1], end[1], z_start, z_end] depth = [0] depth.extend(np.cumsum(loop.config.d)) res = [1e12, *loop.config.rho] freq = loop.config.f for ri, rpos in enumerate(pos_flipped): recx = [*rpos, 0, 0] recy = [*rpos, 90, 0] recz = [*rpos, 0, -90] argsx = [src, recx, depth, res, freq] argsy = [src, recy, depth, res, freq] argsz = [src, recz, depth, res, freq] if use_bipole: field[0, ri] = bipole(*argsx, **kwargs) * np.sum(loop.ds) field[1, ri] = bipole(*argsy, **kwargs) * np.sum(loop.ds) field[2, ri] = bipole(*argsz, **kwargs) * np.sum(loop.ds) else: field[0, ri] = np.sum(bipole(*argsx, **kwargs) * loop.ds) field[1, ri] = np.sum(bipole(*argsy, **kwargs) * loop.ds) field[2, ri] = np.sum(bipole(*argsz, **kwargs) * loop.ds) field *= str_factor return field
# The End