Source code for comet.pyhed.config

"""
Part of comet/pyhed
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import numpy as np
from comet.pyhed.IO import createCustEMDirectories
from comet.pyhed.misc import temporal_printoptions
from comet.pyhed import log
from pathlib import Path


[docs]class config(): """ Basic 1d configuration file. Contents the values for layer parameters rho and d, the mode (te or tm), the fieldtype that will be calculated and the current of the loop. Held by instances of the loop class. """ def __init__(self, name=None, rho=[1000.], d=[], f=2000., mode='te', ftype='B', current=1., forceNew=False): self.rho = rho self.d = d self.f = f self.mode = mode self.ftype = ftype self.current = current if name is not None: if forceNew is False: self.load(name) elif forceNew is True: # creates a new and save in one step self.save(name) def __str__(self): rho = np.atleast_1d(self.rho) d = np.atleast_1d(self.d) if len(rho) == 1: case = 'homogeneous halfspace' else: case = 'layered halfspace' if self.ftype.upper() == 'E': fieldtype = 'electric' elif self.ftype.upper() in ('H', 'B'): fieldtype = 'magnetic, %s' % (self.ftype) return ('### LOOP CONFIGURATION ###{0}' + 'case: {1}{0}' + 'resistivity distribution: {2} [Ohm*m] ({3}){0}' + 'layer thickness: {4} [m] ({5}){0}' + 'frequency: {6:.2f} [Hz]{0}' + 'current: {7} [A]{0}' + 'field type: {8}{0}' + 'field mode: {9}' ).format('\n', case, rho, len(rho), d, len(d), self.f, self.current, fieldtype, self.mode) def __repr__(self): if self.ftype == 'E': field_type = 'electric' elif self.ftype == 'B' or self.ftype == 'H': field_type = 'magnetic, %s' % (self.ftype) if len(self.rho) == 1: ret = 'config: homogeneous halfspace, {} Ohm*m @ {} Hz, {}'.format( np.atleast_1d(self.rho)[0], self.f, field_type) else: ret = 'config: layered earth, {} layers @ {} Hz, {}'.format( len(self.rho), self.f, field_type) return ret
[docs] def save(self, name): """ Saves config in ASCII file format. """ Path(name).parent.mkdir(exist_ok=True, parents=True) with open(name, 'w') as config: config.write('frequency: %f\n' % (self.f)) config.write('field type: %s\n' % (self.ftype)) config.write('mode: %s\n' % (self.mode)) config.write('current: %f\n' % (self.current)) config.write('# rdmap %d\n' % (len(self.rho))) for i in range(len(self.d)): config.write('%f %f\n' % (self.rho[i], self.d[i])) config.write('%f %f' % (self.rho[-1], -1))
[docs] def load(self, name): """ Load config from ASCII file format. """ with open(name, 'r') as config: self.f = float(config.readline().split(': ')[1].split('\n')[0]) self.ftype = config.readline().split(': ')[1].split('\n')[0] self.mode = config.readline().split(': ')[1].split('\n')[0] self.current = \ float(config.readline().split(': ')[1].split('\n')[0]) n = int(config.readline().split('# rdmap ')[1].split('\n')[0]) self.rho = [] self.d = [] for i in range(n - 1): line = config.readline().split(' ') self.rho.append(np.float(line[0])) self.d.append(np.float(line[1].split('\n')[0])) self.rho.append(np.float(config.readline().split(' ')[0])) log.debug(self)
# c = config() # c # >>> ### LOOP CONFIGURATION ### # >>> case: homogeneous halfspace # >>> resistivity distribution: [ 1.] [Ohm*m] (1) # >>> layer thickness: [] [m] (0) # >>> frequency: 2000.0 [Hz] # >>> current: 1.0 [A] # >>> field type: magnetic, B # >>> field mode: te
[docs]class SecondaryConfig: def __init__(self, name=None, mod_name=None, mesh_name=None, m_dir='.', r_dir='.', pf_name='__default__prim_fields', p2=False, approach='E_s', pf_EH_flag='E', sigma_ground=[0.001], procs_per_proc=1, frequency=2000): """ Basic 2d configuration file for pyhed. Held by instances of the loop class if used for secondary field computations out of pyhed. name: string [ None ] Filename of the configuration file. mod_name: string [ None ] Filename of the correcponding mod instance used for secondary field computation using custEM. mesh_name: string [ None ] Filename of the FEM mesh. m_dir: string [ '.' ] Path to the mesh directory where custEM is searching for meshes. r_dir: string [ '.' ] Path to the result directory where custEM results will be found. pf_name: string [ '__default__prim_fields' ] Filename of the primary fields used to exchange primary fields between pyhed and custEM processes. aproach: string [ 'E_s' ] Approach used by custEM for field calculation. Default E secondary, which uses either H or E rpimary fields for calculation of electric fields which are later converted to magnetic fields. This is the most stable way to calculate accurate magnetic fields in anomaly bodys. As for valid strings please consult the documentation of custEM. pf_EH_flag: string [ 'E' ] Flag to determine if primary E or H fields are used for the approach above. sigma_ground: list with floats [ [0.001] ] Primary resistivity structure. Default halfspace with 1000 Ohmm = 0.001 Siemens. Expect Conductivity not resistivity values. procs_per_proc: integer [1] Number of subprocesses a single mpiprocess can spawn to calculate the primary magnetic field if using custem for this purpose. frequency: float [2000] Frequency at which the magnetic field is calculated. Only used in the total field calculation routine, otherwise check out the config-class. Note: ----- sigma_anom corresponds to the sigma values of the anomalies, not the delta of the sigma values to the background! """ # init mod instance self.mesh_name = mesh_name self.mod_name = mod_name self.m_dir = m_dir # os.path.abspath(m_dir) self.r_dir = r_dir # os.path.abspath(r_dir) self.polynominal_order = 2 if p2 else 1 self.enable_overwrite = True # update model parameter self.sigma_raw = [] self.sigma_paramesh = [] self.sigma_anom = [] self.layer_markers = [] self.sigma_ground = np.atleast_1d(sigma_ground) self.sigma_air = 1e-8 self.frequency = frequency # init primary fields self.pf_type = 'custom' self.pf_name = pf_name self.procs_per_proc = procs_per_proc # approach self.approach = approach # E_s more stable for anomalies self.pf_EH_flag = pf_EH_flag # only stable if using E !! if name is not None: self.load(name) log.debug('create custom directories') createCustEMDirectories(m_dir=self.m_dir, r_dir=self.r_dir) log.debug(self) def __str__(self): with temporal_printoptions(edgeitems=2, threshold=5): tp = ('### SEC MOD CONFIG ###{0}' + 'mesh_name : {1}{0}' + 'mod_name : {2}{0}' + 'm_dir (mesh directory) : "{3}"{0}' + 'r_dir (result directory): "{4}"{0}' + 'polynominal_order : {5}{0}' + 'enable_overwrite : {6}{0}' + 'sigma_anom : {7} ({8}){0}' + 'layer_markers : {9} ({10}){0}' + 'sigma_ground : {11} ({12}){0}' + 'sigma_air : {13}{0}' + 'pf_type (primary field) : {14}{0}' + 'pf_name (primary field) : {15}{0}' + 'approach : {16}{0}' + 'used primary field : {17}{0}' + 'sigma_raw : {18} ({19}){0}' + 'sigma_paramesh : {20} ({21}){0}' + 'procs_per_proc : {22}{0}' + 'frequency : {23:.3f}' ).format( '\n', # 0, ending self.mesh_name, # 1 self.mod_name, # 2 self.m_dir, # 3 self.r_dir, # 4 self.polynominal_order, # 5 self.enable_overwrite, # 6 np.atleast_1d(self.sigma_anom), # 7 len(np.atleast_1d(self.sigma_anom)), # 8 np.atleast_1d(self.layer_markers), # 9 len(np.atleast_1d(self.layer_markers)), # 10 np.atleast_1d(self.sigma_ground), # 11 len(np.atleast_1d(self.sigma_ground)), # 12 self.sigma_air, # 13 self.pf_type, # 14 self.pf_name, # 15 self.approach, # 16 self.pf_EH_flag, # 17 np.atleast_1d(self.sigma_raw), # 18 len(np.atleast_1d(self.sigma_raw)), # 19 np.atleast_1d(self.sigma_paramesh), # 20 len(np.atleast_1d(self.sigma_paramesh)), # 21 self.procs_per_proc, # 22 self.frequency, # 23 ) return tp def __repr__(self): string = 'sec mod config: {}-field for {} anomalies'.format( self.pf_type, len(np.atleast_1d(self.sigma_anom))) return string def _setVariable(self, key, *value): """ Internally called by *load* to set and chek class variables. Do not call manually. """ keys_str = ['mesh_name', 'mod_name', 'm_dir', 'r_dir', 'pf_type', 'pf_name', 'enable_overwrite', 'approach', 'pf_EH_flag'] keys_int = ['polynominal_order', 'procs_per_proc'] keys_float = ['sigma_air', 'frequency'] keys_list_float = ['sigma_ground'] len_vals = len(value) if len_vals == 1: value = value[0] if key in keys_str: if isinstance(value, tuple): # hack if space in dirnames hlp = value[0] for i in range(1, len(value)): hlp += ' {}'.format(value[i]) print(hlp) value = hlp if value.lower() == 'none': setattr(self, key, None) elif value.lower() == 'false': setattr(self, key, False) elif value.lower() == 'true': setattr(self, key, True) else: setattr(self, key, value) # print('set string variable: {}, {}'.format(key, value)) elif key in keys_int: setattr(self, key, int(value)) # print('set int variable: {}, {}'.format(key, value)) elif key in keys_float: setattr(self, key, float(value)) # print('set float variable: {}, {}'.format(key, value)) elif key in keys_list_float: if len_vals == 1: setattr(self, key, [float(value.rstrip('],').lstrip('[,'))]) # print('set list variable: {}, {}'.format( # key, [float(value.rstrip('],').lstrip('[,'))])) else: values = [] for val in value: strp = val.rstrip('],').lstrip('[,') if strp != '': values.append(float(strp)) setattr(self, key, list(values)) # print('set list variable: {}, {}'.format(key, values)) else: raise KeyError(key)
[docs] def setAnomalies(self, sigma_anom, layer_markers=None): """ Set anomalie vector. Parameters ---------- sigma_anom: np.ndarray Array with sigma values for each cell marked as anomaly. layer_markers: np.ndarray [ None ] Array containing the cell marker for each anomaly value (cell) to calculate the sigma anomalies with respect to the 1d background model. None indicates a homogenous background and all marker are set to 1 (0 is airspace). """ if layer_markers is None: layer_markers = np.ones_like(sigma_anom, dtype=int) if len(sigma_anom) != len(layer_markers): raise Exception('Number of sigma anomalies ({}) is not equal ' 'to corresponding layer markers ({})' .format(len(sigma_anom), len(layer_markers))) self.sigma_anom = np.array(sigma_anom).tolist() self.layer_markers = np.array(layer_markers).tolist()
[docs] def save(self, filename): """ Save secondary config in ASCII file format. Parameters ---------- Filename: string Filename for saving. Sub directories are created on the fly if code execution has the proper rights. """ # provide some temporal on the fly formatting for ASCII strings when it # comes to ndarrays: with temporal_printoptions(threshold=len(self.sigma_ground) + 1): header = ('mesh_name = {1}{0}' + 'mod_name = {2}{0}' + 'm_dir = {3}{0}' + 'r_dir = {4}{0}' + 'polynominal_order = {5:d}{0}' + 'enable_overwrite = {6}{0}' + 'sigma_air = {7:.18e}{0}' + 'pf_type = {8}{0}' + 'pf_name = {9}{0}' 'approach = {10}{0}' + 'pf_EH_flag = {11}{0}' + 'sigma_ground = {12}{0}' + 'frequency = {13}{0}' + 'procs_per_proc = {14}{0}' + 'END OF HEADER{0}' + 'sigma_anom layer_markers' ).format( '\n', # 0, ending # oddly enough os.linesep doesn't seem to work... self.mesh_name, # 1 self.mod_name, # 2 self.m_dir, # 3 self.r_dir, # 4 self.polynominal_order, # 5 self.enable_overwrite, # 6 self.sigma_air, # 7 self.pf_type, # 8 self.pf_name, # 9 self.approach, # 10 self.pf_EH_flag, # 11 self.sigma_ground, # 12 self.frequency, # 13 self.procs_per_proc # 14 ) Path(filename).parent.mkdir(exist_ok=True, parents=True) content = np.column_stack( [np.atleast_1d(self.sigma_anom), np.atleast_1d(self.layer_markers)]).tolist() np.savetxt(filename, content, header=header, fmt='%.18e %d') log.info('Saved secondary config as "{}".'.format( os.path.abspath(filename)))
[docs] def load(self, filename): """ Load config from file. Parameters ---------- filename: sting Relative or absolute path to file. Returns: -------- None """ with open(filename, 'r') as infile: log.info('Load secondary config from "{}".' .format(os.path.abspath(filename))) in_header = True while in_header: line = infile.readline().lstrip('#').lstrip().rstrip() if line[:13] == 'END OF HEADER': in_header = False break if line == '': continue else: sep = line.rstrip().split('#')[0].split() self._setVariable(sep[0], *sep[2:]) arrays = np.genfromtxt(filename, dtype=[('sigma_anom', np.float64), ('layer_markers', np.int)]) self.setAnomalies(arrays['sigma_anom'], layer_markers=arrays['layer_markers']) # TODO: import of sigma_raw and sigma_raw_prolonged only from .json # Do I need them? self.sigma_raw = [] self.sigma_paramesh = []
# The End