Source code for comet.pyhed.misc.toolbox

"""
Part of comet/pyhed/misc
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import sys
import numpy as np
import inspect
from contextlib import contextmanager


[docs]@contextmanager def temporal_printoptions(threshold=5, **kwargs): """ Temporal overrides the printoptions of numpy arrays. """ _printoptions = np.get_printoptions() np.set_printoptions(threshold=threshold, **kwargs) yield np.set_printoptions(**_printoptions)
[docs]@contextmanager def plt_ioff(): """ Temporal overrides the interactive mode of matplotlib. """ import matplotlib.pyplot as plt was_inter = plt.isinteractive() plt.ioff() yield if was_inter: plt.ion() else: plt.ioff()
[docs]@contextmanager def plt_ion(): """ Temporal overrides the interactive mode of matplotlib. """ import matplotlib.pyplot as plt was_inter = plt.isinteractive() plt.ion() yield if was_inter: plt.ion() else: plt.ioff()
[docs]class NamespaceError(Exception): """ Named Error for try except clauses.""" def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]def insert(array1, array2, breaking_point_float=0, right=True): """ Utility function to insert points between two arrays. Depricated.""" if array2 == []: stitchedArray = array1[np.nonzero(array1 != breaking_point_float)] elif right is True and array2 != []: stitchedArray = np.append(np.append(array1[np.nonzero(array1 < breaking_point_float)], array2), array1[np.nonzero(array1 > breaking_point_float)]) elif right is False and array2 != []: stitchedArray = np.append(np.append(array1[np.nonzero(array1 > breaking_point_float)], array2), array1[np.nonzero(array1 < breaking_point_float)]) return stitchedArray
[docs]def refine(array, start='0', end='-1', insert=1, log=False, zerovalue=False, invert=False): """ Utility function to refine array. Depricated. """ if invert is True: array = array[::-1] def __work(array, insert): if array == []: return [] refined_list = [] for i in range(len(array)): refined_list.append(array[i]) if i + 1 < len(array): refined_list.extend(np.linspace(array[i], array[i + 1], 2 + insert)[1:-1]) return np.asarray(refined_list) array = np.asarray(array) if start == '0': start = array[0] if end == '-1': end = array[-1] to_refine = array[np.all([(array >= start), (array <= end)], axis=0)] if not zerovalue: zerovalue = min(abs(to_refine)) / 100. if log is False: refined_list = __work(to_refine, insert) elif log is True: refined_list_plus = [] refined_list_minus = [] to_refine_plus = np.log(to_refine[np.nonzero(to_refine > 0)]) to_refine_minus = np.log(-1 * to_refine[ np.nonzero(to_refine < 0)][::-1]) if 0 in to_refine or (to_refine_minus != [] and to_refine_minus != []): to_refine_minus = np.append(np.log(zerovalue), to_refine_minus) to_refine_plus = np.append(np.log(zerovalue), to_refine_plus) if len(to_refine_plus) == 1: if 0 in to_refine: to_refine_plus = [] else: if start <= 0 and end >= 0: to_refine_plus = np.append(np.log(zerovalue), to_refine_plus) if len(to_refine_minus) == 1: if 0 in to_refine: to_refine_minus = [] else: if start <= 0 and end >= 0: to_refine_minus = np.append(np.log(zerovalue), to_refine_minus) plus = __work(to_refine_plus, insert) print(plus) refined_list_plus = np.exp(plus[np.nonzero(plus != 0)]) minus = __work(to_refine_minus, insert) print(minus) refined_list_minus = -1 * np.exp(minus[np.nonzero(minus != 0)][::-1]) if 0 in to_refine: to_refine_plus = np.append(np.zeros(1), to_refine_plus) refined_list = np.append(refined_list_minus, refined_list_plus) print(refined_list) array_to_return = np.append(np.append(array[np.nonzero(array < start)], refined_list), array[np.nonzero(array > end)]) if invert is True: return array_to_return[::-1] return array_to_return
[docs]def setdebugging(Bool, local=True): """ Temporal function used to control debug mode. Do not use. """ # for debugging only if local is True: # target = caller namespace global debugging debugging = {'debugging': Bool} return debugging else: # target = module namespace where setdebugging is defined stack = inspect.stack()[0] if 'debugging' not in stack[0].f_globals: raise NamespaceError( 'cannot define global variable from outer namespace') debugging = stack[0].f_globals['debugging'] debugging['debugging'] = Bool # print(stack[1].split('src')[1], debugging) # print('debugging' in inspect.stack()[0][0].f_globals) return debugging
[docs]def printv(string, *args): ''' for maintenance and debugging ''' callerframerecord = inspect.stack()[1] # print(callerframerecord) frame = callerframerecord[0] # frame/namespace des callers caller_globals = frame.f_globals # globals des Callers debugging_value = caller_globals['debugging']['debugging'] # print('printv: %s' % (debugging_value)) if debugging_value is True: # print(inspect.stack()[:][1]) # if inspect.stack()[0][1] != inspect.stack()[1][1]: # origin = '\.: ' # else: origin = callerframerecord[1].split('\\')[-1] + ': ' info = inspect.getframeinfo(frame) # mit info.lineno bekommt man die zeilennummer, # in der die funktion ausgeführt wird if not args: print(origin + str(info.lineno) + ' ', string) else: if type(args[0]) != np.float64: try: print(origin + str(info.lineno), string + '\t', args[0].shape) except AttributeError: print(origin + str(info.lineno), string + '\t', str(args[0]) + '\t', str(type(args[0]))) else: print(origin + str(info.lineno), string + '\t', str(args[0]) + '\tnp.float') else: return
[docs]def project1DModel(thk, para, out): """ projects a simple synthetic layered model to a given discretisation. The discretisation "disOut" has to be a vector for the corners of the desired discretisation. Therefore the output will be a vector with len(outDis) - 1 elements. "parameter" must have one entry more than thickness. >>> thk = [2.375] >>> resistivity = [100, 5] >>> disOut = np.linspace(0, -4.5, 10) >>> model = project1DModel(thk, resistivity, disOut) >>> print(model) # 100 * 0.75 + 5 * 0.25 = 76.25 [ 100. 100. 100. 100. 76.25 5. 5. 5. 5. ] """ para = np.array(para, np.float) out = np.array(abs(out)) model = np.ones_like(out[1:], np.float) akkthk = np.cumsum(thk) if len(para) == 1: model *= para # short cut # simple homegeneous halfspace, return, done return model # input model geometry in layer depths, z positiv downwards inl = [0] inl.extend(akkthk) # now for each inl of the desired output geometry... # print('inl depths', inl) # print('values', para) for i, n_1 in enumerate(out[1:]): n_0 = out[i] # n_0 = out[i] # n_1 = out[i + 1] # current cell between n_0 and n_1 a = np.where(inl >= n_1)[0] # input inl under current cell b = np.where(inl <= n_0)[0] # input inl above current cell # input inl in between n_0 and n_1 d = np.where((inl < n_1) & (inl > n_0))[0] if len(a) == 0: # no inl under n_1, only halfspace if b[-1] != len(thk): tk = n_1 - n_0 hlp = n_0 model[i] = 0. # append para from each layer complete between n_0 and n_1 for d_i in d: ratio = (inl[d_i] - hlp) / tk model[i] += ratio * para[d_i - 1] hlp = inl[d_i] # append potion of last (infinite) layer ratio = (n_1 - inl[len(thk)]) / tk model[i] += ratio * para[-1] continue else: model[i:] = para[-1] # reached homogeneous halfspace, return, done return model c = a[0] - b[-1] # print('c', c, para[b[-1]]) if c == 1: # model[i] = para[b[-1]] else: # found out inl bigger than multiple entries in input thk # calc weight and sum up for n in range(c - 1): inner_barrier = inl[b[-1] + n + 1] if n == 0: model[i] = 0 weighting = (inner_barrier - n_0) / (n_1 - n_0) value = para[n + b[-1]] # print('w1', weighting, value) # w1 = weighting # w2 = 0.0 else: weighting = (inner_barrier - inl[b[-1] + n]) /\ (n_1 - n_0) value = para[n + b[-1]] # print('w2', weighting, value) # w2 += weighting model[i] += weighting * value weighting = (n_1 - inl[b[-1] + c - 1]) / (n_1 - n_0) # w3 = weighting value = para[c - 1 + b[-1]] # print('w3', weighting, value) # print(w1+w2+w3) model[i] += weighting * value # print(model[i]) return model
[docs]def convertCoordinates(gimli, dolfin): """ Find sorting between two coordinate arrays if same points. input: arr1, arr2: two coordinate lists of same shape (n, 3) which contains the same coordinates but in a diffrent order. output: arr1_arr2, arr2_arr1: index arrays which converts coordinates from input1 to input2 and from input2 to input1. """ def indexAndSorting(c_): posview = c_.ravel().view( np.dtype((np.void, c_.dtype.itemsize*c_.shape[1]))) _, unique_idx = np.unique(posview, return_index=True) return unique_idx, unique_idx.argsort() # check shape_pg = np.shape(gimli) shape_df = np.shape(dolfin) if shape_pg != shape_df: raise Exception('Dimension mismatch between gimli and dolfin mesh: ' '{} != {}'.format(shape_pg, shape_df)) # calc pg_idx, pg_sort = indexAndSorting(gimli) df_idx, df_sort = indexAndSorting(dolfin) return pg_idx[df_sort], df_idx[pg_sort]
[docs]def floatString(value, frmt='2.2f', replace='_'): """ Converts a Float to a string for filenames etc. """ return ('{:%s}' % (frmt)).format(value).replace(".", replace)
[docs]def setNearestMarkers_old(outmesh, inmesh, marker_air=0, marker_half=1, fill_air_ground=False): """ Set marker from one mesh to another. Returns list with ommitted marker. """ import time tick = time.time() for i, cell in enumerate(outmesh.cells()): cellc = cell.center() incell = inmesh.findCell(cellc, extensive=False) if incell: if cellc.z() >= 0: cell.setMarker(marker_air) else: cell.setMarker(incell.marker()) else: if fill_air_ground is True: if cellc.z() >= 0: cell.setMarker(marker_air) elif cellc.z() < 0: cell.setMarker(marker_half) # extensive test is neccessary to prevent fenics from busting tack = time.time() - tick not_set = [] for m in range(np.max(inmesh.cellMarkers()) + 1): cs = outmesh.findCellByMarker(m) if len(cs) == 0: not_set.append(m) # nFEM = 'FEM_mesh_markerException.vtk' # nPARA = 'para_mesh_3d_markerException.vtk' # try: # outmesh.exportVTK(nFEM) # inmesh.exportVTK(nPARA) # finally: # raise Exception( # 'Not all marker of 3D parameter mesh could be ' # 'transferred to the FEM mesh. Stopped at marker ' # '{}. Both meshs has been exported as VTK for ' # 'debugging purposes. ("{}", "{}")' # .format(m, os.path.abspath(nFEM), # os.path.abspath(nPARA))) print('setNearestMarkers', tack, time.time() - tick) return not_set
[docs]def setNearestMarkers(outmesh, inmesh, y_lim, marker_air=0, marker_half=1, fill_air_ground=False, air_interface=0.0): """ Set marker from 2d mesh + limits in y to 3d mesh. Returns list with ommitted marker or empty list. Optinally fills air and groundspace outside the 2d mesh with marker. """ from comet import log cellmarkers = np.array(outmesh.cellMarkers()) # original markers coords = outmesh.cellCenters().array() # (n, 3) cell_ids = np.arange(len(cellmarkers)) # marker id in outmesh air = coords[:, 2] >= air_interface ground = np.logical_not(air) inx = np.logical_and(coords[:, 0] >= inmesh.xmin(), coords[:, 0] <= inmesh.xmax()) # outx = np.logical_not(inx) iny = np.logical_and(coords[:, 1] >= np.min(y_lim), coords[:, 1] <= np.max(y_lim)) # outy = np.logical_not(iny) inz = np.logical_and(coords[:, 2] >= inmesh.ymin(), coords[:, 2] <= inmesh.ymax()) # outz = np.logical_not(inx) inside = np.logical_and(np.logical_and(inx, iny), inz) outside = np.logical_not(inside) # cell indices inside y range and inmesh dimensions -> marker if fill_air_ground: # air cellmarkers[air] = marker_air # ground outside of y range and 2d mesh cellmarkers[np.logical_and(ground, outside)] = marker_half anom_ids = np.logical_and(ground, inside) anom_cell_ids = cell_ids[anom_ids] # cells in ground inside y range and parameter mesh import time time_start = time.time() for cid, coord in enumerate(coords[np.logical_and(ground, inside)]): incell = inmesh.findCell([coord[0], coord[2]], extensive=False) if incell is None: log.critical('No cell found for coord: [{}, {}]'.format(coord[0], coord[2])) import matplotlib.pyplot as plt import pygimli as pg fig, ax = plt.subplots(1, 1) pg.show(inmesh, markers=True, ax=ax) plt.plot(coord[0], coord[2], 'x', label='not found ({:.2f}, {:.2f})'.format(coord[0], coord[2])) plt.show() raise Exception('Something is wrong with your 2D parameter mesh!') # translate local enumerate id to global cell id of outmesh cellmarkers[anom_cell_ids[cid]] = incell.marker() log.debug('setNearestMarkers() main loop: {:.4f} sec' .format(time.time() - time_start)) # set new marker outmesh.setCellMarkers(cellmarkers) # test if all marker are transferred and return omitted marker not_set = [] un = np.unique(cellmarkers) time_start = time.time() for m in np.unique(inmesh.cellMarkers()): if m not in un: not_set.append(m) log.debug('setNearestMarkers() gather not set: {:.4f} sec' .format(time.time() - time_start)) return not_set
[docs]def getAllValuesByReference(mat, refarray): """ Gets all values from input hdf5 data set found in given reference array of the same dataset. Example: -------- >>> import h5py >>> from comet import pyhed as ph >>> mat = h5py.File('input.mrsd') >>> # get pulse moments from mrsd file >>> pulse_mat = mat['proclog']['Q']['q'] >>> pulses = ph.misc.getAllValuesByReference(mat, pulse_mat) array([ 0.11261871, 0.15802349, 0.1729516 , 0.24440305, 0.27615926, ... 0.39153588, 0.45558559, 0.64535046, 0.77051771, 1.08620425, ... 1.32817318, 1.85991744, 2.32437798, 3.22896476, 4.11364457, ... 5.66420968, 7.33714431, 10.01091275, 13.18643762, 17.83750801]) """ import h5py if isinstance(refarray, h5py.Reference): outarr = mat[h5py.h5r.get_name(refarray, mat.id)] else: outarr = np.zeros(len(refarray), dtype=object) out = [] for idx in range(len(outarr)): out.append(mat[h5py.h5r.get_name(refarray[()][idx, 0], mat.id)]) if isinstance(out[0], h5py.Group): outarr = out else: outarr = np.ones(len(out), dtype=out[0].dtype) for it, item in enumerate(out): outarr[it] = item[0, 0] return outarr
[docs]def progressBar(it, prefix="", file=sys.stdout): """ Iterable progress bar. Usage exchange: >>> for i in range(12): with: >>> for i in progressBar(range(12), 'some describing string: '): """ # Thanks to "eusoubrasileiro" from stackoverflow # for sharing the basic idea of this implementation under # https://stackoverflow.com/questions/3160699/python-progress-bar # this is a modified version size = 80 size_counter = int(np.ceil(np.log10(len(it)))) size = size - len(prefix) size -= int(size_counter * 2 + 4) count = len(it) def show(j): x = int(size * j / count) string_ = '{}[{}{}] {%s}/{%s}\r' % (':>{}d'.format(size_counter), ':>{}d'.format(size_counter)) string = string_.format(prefix, "#" * x, "." * (size - x), j, count) file.write(string) file.flush() show(0) for i, item in enumerate(it): yield item show(i + 1) file.write("\n") file.flush()
# THE END