"""
Part of comet/pyhed/misc
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
import numpy as np
import inspect
from contextlib import contextmanager
[docs]@contextmanager
def temporal_printoptions(threshold=5, **kwargs):
""" Temporal overrides the printoptions of numpy arrays. """
_printoptions = np.get_printoptions()
np.set_printoptions(threshold=threshold, **kwargs)
yield
np.set_printoptions(**_printoptions)
[docs]@contextmanager
def plt_ioff():
""" Temporal overrides the interactive mode of matplotlib. """
import matplotlib.pyplot as plt
was_inter = plt.isinteractive()
plt.ioff()
yield
if was_inter:
plt.ion()
else:
plt.ioff()
[docs]@contextmanager
def plt_ion():
""" Temporal overrides the interactive mode of matplotlib. """
import matplotlib.pyplot as plt
was_inter = plt.isinteractive()
plt.ion()
yield
if was_inter:
plt.ion()
else:
plt.ioff()
[docs]class NamespaceError(Exception):
""" Named Error for try except clauses."""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs]def insert(array1, array2, breaking_point_float=0, right=True):
""" Utility function to insert points between two arrays. Depricated."""
if array2 == []:
stitchedArray = array1[np.nonzero(array1 != breaking_point_float)]
elif right is True and array2 != []:
stitchedArray = np.append(np.append(array1[np.nonzero(array1 <
breaking_point_float)], array2),
array1[np.nonzero(array1 >
breaking_point_float)])
elif right is False and array2 != []:
stitchedArray = np.append(np.append(array1[np.nonzero(array1 >
breaking_point_float)], array2),
array1[np.nonzero(array1 <
breaking_point_float)])
return stitchedArray
[docs]def refine(array, start='0', end='-1', insert=1,
log=False,
zerovalue=False,
invert=False):
""" Utility function to refine array. Depricated. """
if invert is True:
array = array[::-1]
def __work(array, insert):
if array == []:
return []
refined_list = []
for i in range(len(array)):
refined_list.append(array[i])
if i + 1 < len(array):
refined_list.extend(np.linspace(array[i],
array[i + 1],
2 + insert)[1:-1])
return np.asarray(refined_list)
array = np.asarray(array)
if start == '0':
start = array[0]
if end == '-1':
end = array[-1]
to_refine = array[np.all([(array >= start), (array <= end)], axis=0)]
if not zerovalue:
zerovalue = min(abs(to_refine)) / 100.
if log is False:
refined_list = __work(to_refine, insert)
elif log is True:
refined_list_plus = []
refined_list_minus = []
to_refine_plus = np.log(to_refine[np.nonzero(to_refine > 0)])
to_refine_minus = np.log(-1 * to_refine[
np.nonzero(to_refine < 0)][::-1])
if 0 in to_refine or (to_refine_minus != [] and to_refine_minus != []):
to_refine_minus = np.append(np.log(zerovalue), to_refine_minus)
to_refine_plus = np.append(np.log(zerovalue), to_refine_plus)
if len(to_refine_plus) == 1:
if 0 in to_refine:
to_refine_plus = []
else:
if start <= 0 and end >= 0:
to_refine_plus = np.append(np.log(zerovalue),
to_refine_plus)
if len(to_refine_minus) == 1:
if 0 in to_refine:
to_refine_minus = []
else:
if start <= 0 and end >= 0:
to_refine_minus = np.append(np.log(zerovalue),
to_refine_minus)
plus = __work(to_refine_plus, insert)
print(plus)
refined_list_plus = np.exp(plus[np.nonzero(plus != 0)])
minus = __work(to_refine_minus, insert)
print(minus)
refined_list_minus = -1 * np.exp(minus[np.nonzero(minus != 0)][::-1])
if 0 in to_refine:
to_refine_plus = np.append(np.zeros(1), to_refine_plus)
refined_list = np.append(refined_list_minus, refined_list_plus)
print(refined_list)
array_to_return = np.append(np.append(array[np.nonzero(array < start)],
refined_list),
array[np.nonzero(array > end)])
if invert is True:
return array_to_return[::-1]
return array_to_return
[docs]def setdebugging(Bool, local=True):
""" Temporal function used to control debug mode. Do not use. """
# for debugging only
if local is True:
# target = caller namespace
global debugging
debugging = {'debugging': Bool}
return debugging
else:
# target = module namespace where setdebugging is defined
stack = inspect.stack()[0]
if 'debugging' not in stack[0].f_globals:
raise NamespaceError(
'cannot define global variable from outer namespace')
debugging = stack[0].f_globals['debugging']
debugging['debugging'] = Bool
# print(stack[1].split('src')[1], debugging)
# print('debugging' in inspect.stack()[0][0].f_globals)
return debugging
[docs]def printv(string, *args):
'''
for maintenance and debugging
'''
callerframerecord = inspect.stack()[1]
# print(callerframerecord)
frame = callerframerecord[0] # frame/namespace des callers
caller_globals = frame.f_globals # globals des Callers
debugging_value = caller_globals['debugging']['debugging']
# print('printv: %s' % (debugging_value))
if debugging_value is True:
# print(inspect.stack()[:][1])
# if inspect.stack()[0][1] != inspect.stack()[1][1]:
# origin = '\.: '
# else:
origin = callerframerecord[1].split('\\')[-1] + ': '
info = inspect.getframeinfo(frame)
# mit info.lineno bekommt man die zeilennummer,
# in der die funktion ausgeführt wird
if not args:
print(origin + str(info.lineno) + ' ', string)
else:
if type(args[0]) != np.float64:
try:
print(origin + str(info.lineno), string + '\t',
args[0].shape)
except AttributeError:
print(origin + str(info.lineno), string + '\t',
str(args[0]) + '\t', str(type(args[0])))
else:
print(origin + str(info.lineno), string + '\t',
str(args[0]) + '\tnp.float')
else:
return
[docs]def project1DModel(thk, para, out):
"""
projects a simple synthetic layered model to a given discretisation.
The discretisation "disOut" has to be a vector for the corners of the
desired discretisation. Therefore the output will be a vector with
len(outDis) - 1 elements. "parameter" must have one entry more than
thickness.
>>> thk = [2.375]
>>> resistivity = [100, 5]
>>> disOut = np.linspace(0, -4.5, 10)
>>> model = project1DModel(thk, resistivity, disOut)
>>> print(model) # 100 * 0.75 + 5 * 0.25 = 76.25
[ 100. 100. 100. 100. 76.25 5. 5. 5. 5. ]
"""
para = np.array(para, np.float)
out = np.array(abs(out))
model = np.ones_like(out[1:], np.float)
akkthk = np.cumsum(thk)
if len(para) == 1:
model *= para
# short cut
# simple homegeneous halfspace, return, done
return model
# input model geometry in layer depths, z positiv downwards
inl = [0]
inl.extend(akkthk)
# now for each inl of the desired output geometry...
# print('inl depths', inl)
# print('values', para)
for i, n_1 in enumerate(out[1:]):
n_0 = out[i]
# n_0 = out[i]
# n_1 = out[i + 1]
# current cell between n_0 and n_1
a = np.where(inl >= n_1)[0] # input inl under current cell
b = np.where(inl <= n_0)[0] # input inl above current cell
# input inl in between n_0 and n_1
d = np.where((inl < n_1) & (inl > n_0))[0]
if len(a) == 0: # no inl under n_1, only halfspace
if b[-1] != len(thk):
tk = n_1 - n_0
hlp = n_0
model[i] = 0.
# append para from each layer complete between n_0 and n_1
for d_i in d:
ratio = (inl[d_i] - hlp) / tk
model[i] += ratio * para[d_i - 1]
hlp = inl[d_i]
# append potion of last (infinite) layer
ratio = (n_1 - inl[len(thk)]) / tk
model[i] += ratio * para[-1]
continue
else:
model[i:] = para[-1]
# reached homogeneous halfspace, return, done
return model
c = a[0] - b[-1]
# print('c', c, para[b[-1]])
if c == 1: #
model[i] = para[b[-1]]
else:
# found out inl bigger than multiple entries in input thk
# calc weight and sum up
for n in range(c - 1):
inner_barrier = inl[b[-1] + n + 1]
if n == 0:
model[i] = 0
weighting = (inner_barrier - n_0) / (n_1 - n_0)
value = para[n + b[-1]]
# print('w1', weighting, value)
# w1 = weighting
# w2 = 0.0
else:
weighting = (inner_barrier - inl[b[-1] + n]) /\
(n_1 - n_0)
value = para[n + b[-1]]
# print('w2', weighting, value)
# w2 += weighting
model[i] += weighting * value
weighting = (n_1 - inl[b[-1] + c - 1]) / (n_1 - n_0)
# w3 = weighting
value = para[c - 1 + b[-1]]
# print('w3', weighting, value)
# print(w1+w2+w3)
model[i] += weighting * value
# print(model[i])
return model
[docs]def convertCoordinates(gimli, dolfin):
""" Find sorting between two coordinate arrays if same points.
input: arr1, arr2: two coordinate lists of same shape (n, 3)
which contains the same coordinates but in a diffrent order.
output: arr1_arr2, arr2_arr1: index arrays which converts coordinates
from input1 to input2 and from input2 to input1.
"""
def indexAndSorting(c_):
posview = c_.ravel().view(
np.dtype((np.void, c_.dtype.itemsize*c_.shape[1])))
_, unique_idx = np.unique(posview, return_index=True)
return unique_idx, unique_idx.argsort()
# check
shape_pg = np.shape(gimli)
shape_df = np.shape(dolfin)
if shape_pg != shape_df:
raise Exception('Dimension mismatch between gimli and dolfin mesh: '
'{} != {}'.format(shape_pg, shape_df))
# calc
pg_idx, pg_sort = indexAndSorting(gimli)
df_idx, df_sort = indexAndSorting(dolfin)
return pg_idx[df_sort], df_idx[pg_sort]
[docs]def floatString(value, frmt='2.2f', replace='_'):
""" Converts a Float to a string for filenames etc. """
return ('{:%s}' % (frmt)).format(value).replace(".", replace)
[docs]def setNearestMarkers_old(outmesh, inmesh, marker_air=0, marker_half=1,
fill_air_ground=False):
""" Set marker from one mesh to another.
Returns list with ommitted marker.
"""
import time
tick = time.time()
for i, cell in enumerate(outmesh.cells()):
cellc = cell.center()
incell = inmesh.findCell(cellc, extensive=False)
if incell:
if cellc.z() >= 0:
cell.setMarker(marker_air)
else:
cell.setMarker(incell.marker())
else:
if fill_air_ground is True:
if cellc.z() >= 0:
cell.setMarker(marker_air)
elif cellc.z() < 0:
cell.setMarker(marker_half)
# extensive test is neccessary to prevent fenics from busting
tack = time.time() - tick
not_set = []
for m in range(np.max(inmesh.cellMarkers()) + 1):
cs = outmesh.findCellByMarker(m)
if len(cs) == 0:
not_set.append(m)
# nFEM = 'FEM_mesh_markerException.vtk'
# nPARA = 'para_mesh_3d_markerException.vtk'
# try:
# outmesh.exportVTK(nFEM)
# inmesh.exportVTK(nPARA)
# finally:
# raise Exception(
# 'Not all marker of 3D parameter mesh could be '
# 'transferred to the FEM mesh. Stopped at marker '
# '{}. Both meshs has been exported as VTK for '
# 'debugging purposes. ("{}", "{}")'
# .format(m, os.path.abspath(nFEM),
# os.path.abspath(nPARA)))
print('setNearestMarkers', tack, time.time() - tick)
return not_set
[docs]def setNearestMarkers(outmesh, inmesh, y_lim, marker_air=0, marker_half=1,
fill_air_ground=False, air_interface=0.0):
""" Set marker from 2d mesh + limits in y to 3d mesh.
Returns list with ommitted marker or empty list.
Optinally fills air and groundspace outside the 2d mesh with marker.
"""
from comet import log
cellmarkers = np.array(outmesh.cellMarkers()) # original markers
coords = outmesh.cellCenters().array() # (n, 3)
cell_ids = np.arange(len(cellmarkers)) # marker id in outmesh
air = coords[:, 2] >= air_interface
ground = np.logical_not(air)
inx = np.logical_and(coords[:, 0] >= inmesh.xmin(),
coords[:, 0] <= inmesh.xmax())
# outx = np.logical_not(inx)
iny = np.logical_and(coords[:, 1] >= np.min(y_lim),
coords[:, 1] <= np.max(y_lim))
# outy = np.logical_not(iny)
inz = np.logical_and(coords[:, 2] >= inmesh.ymin(),
coords[:, 2] <= inmesh.ymax())
# outz = np.logical_not(inx)
inside = np.logical_and(np.logical_and(inx, iny), inz)
outside = np.logical_not(inside)
# cell indices inside y range and inmesh dimensions -> marker
if fill_air_ground:
# air
cellmarkers[air] = marker_air
# ground outside of y range and 2d mesh
cellmarkers[np.logical_and(ground, outside)] = marker_half
anom_ids = np.logical_and(ground, inside)
anom_cell_ids = cell_ids[anom_ids]
# cells in ground inside y range and parameter mesh
import time
time_start = time.time()
for cid, coord in enumerate(coords[np.logical_and(ground, inside)]):
incell = inmesh.findCell([coord[0], coord[2]], extensive=False)
if incell is None:
log.critical('No cell found for coord: [{}, {}]'.format(coord[0],
coord[2]))
import matplotlib.pyplot as plt
import pygimli as pg
fig, ax = plt.subplots(1, 1)
pg.show(inmesh, markers=True, ax=ax)
plt.plot(coord[0], coord[2], 'x',
label='not found ({:.2f}, {:.2f})'.format(coord[0],
coord[2]))
plt.show()
raise Exception('Something is wrong with your 2D parameter mesh!')
# translate local enumerate id to global cell id of outmesh
cellmarkers[anom_cell_ids[cid]] = incell.marker()
log.debug('setNearestMarkers() main loop: {:.4f} sec'
.format(time.time() - time_start))
# set new marker
outmesh.setCellMarkers(cellmarkers)
# test if all marker are transferred and return omitted marker
not_set = []
un = np.unique(cellmarkers)
time_start = time.time()
for m in np.unique(inmesh.cellMarkers()):
if m not in un:
not_set.append(m)
log.debug('setNearestMarkers() gather not set: {:.4f} sec'
.format(time.time() - time_start))
return not_set
[docs]def getAllValuesByReference(mat, refarray):
""" Gets all values from input hdf5 data set found in given reference array
of the same dataset.
Example:
--------
>>> import h5py
>>> from comet import pyhed as ph
>>> mat = h5py.File('input.mrsd')
>>> # get pulse moments from mrsd file
>>> pulse_mat = mat['proclog']['Q']['q']
>>> pulses = ph.misc.getAllValuesByReference(mat, pulse_mat)
array([ 0.11261871, 0.15802349, 0.1729516 , 0.24440305, 0.27615926,
... 0.39153588, 0.45558559, 0.64535046, 0.77051771, 1.08620425,
... 1.32817318, 1.85991744, 2.32437798, 3.22896476, 4.11364457,
... 5.66420968, 7.33714431, 10.01091275, 13.18643762, 17.83750801])
"""
import h5py
if isinstance(refarray, h5py.Reference):
outarr = mat[h5py.h5r.get_name(refarray, mat.id)]
else:
outarr = np.zeros(len(refarray), dtype=object)
out = []
for idx in range(len(outarr)):
out.append(mat[h5py.h5r.get_name(refarray[()][idx, 0], mat.id)])
if isinstance(out[0], h5py.Group):
outarr = out
else:
outarr = np.ones(len(out), dtype=out[0].dtype)
for it, item in enumerate(out):
outarr[it] = item[0, 0]
return outarr
[docs]def progressBar(it, prefix="", file=sys.stdout):
""" Iterable progress bar.
Usage
exchange:
>>> for i in range(12):
with:
>>> for i in progressBar(range(12), 'some describing string: '):
"""
# Thanks to "eusoubrasileiro" from stackoverflow
# for sharing the basic idea of this implementation under
# https://stackoverflow.com/questions/3160699/python-progress-bar
# this is a modified version
size = 80
size_counter = int(np.ceil(np.log10(len(it))))
size = size - len(prefix)
size -= int(size_counter * 2 + 4)
count = len(it)
def show(j):
x = int(size * j / count)
string_ = '{}[{}{}] {%s}/{%s}\r' % (':>{}d'.format(size_counter),
':>{}d'.format(size_counter))
string = string_.format(prefix, "#" * x, "." * (size - x), j, count)
file.write(string)
file.flush()
show(0)
for i, item in enumerate(it):
yield item
show(i + 1)
file.write("\n")
file.flush()
# THE END