"""
Part of comet/snmr/survey
Enhanced sounding class for SNMR data sets and supporting variables.
Sounding class can hold any number of Measurement class instances each
representing single FIDs.
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import numpy as np
from comet.snmr import misc
from comet import pyhed as ph
from scipy.io import loadmat
from pathlib import Path
import pygimli as pg
[docs]class Survey(object):
""" Survey class for containment and handling of SNMR datasets (FIDS).
"""
def __init__(self, earth=None, loops=None):
# List with stored experiments (soundings/FIDs).
self.fids = []
# loops (transmitters and receivers)
self.loops = np.array([], dtype=object)
self.loop_config = None
self.custem_config = None
# earth incl, decl, magnitude (larmor frequency)
self.earth = None
self.setEarth(earth=earth)
self.loaded_meshes = {}
def __repr__(self):
placeholder = ''
nNone = sum(self.loops == None) # elementwise comparison to None
if nNone > 0:
placeholder = '({} are not defined)'.format(nNone)
return 'SNMR Survey: {} loops {}, {} soundings'.format(
len(self.loops), placeholder, len(self.fids))
@property
def pulses(self):
''' Pulse moment vectors gathered from soundings.'''
return self._gather('pulses')
@property
def gates(self):
''' Time gates gathered from soundings.'''
return self._gather('gates')
@property
def data(self):
''' Complex data cube (pulses * gates) from soundings.'''
return self._gather('data_gated')
@property
def error(self):
''' Complex error cube (pulses * gates) from soundings.'''
return self._gather('error_gated')
@property
def data_phases(self):
''' Single data phases of the FIDs.'''
return self._gather('data_phase')
@property
def response(self):
''' Complex data cube (pulses * gates) from soundings.'''
return self._gather('response')
@property
def tx_indices(self):
''' Indices of the used transitter of each sounding.'''
return self._gather('tx_index')
@property
def rx_indices(self):
''' Indices of the used receiver of each sounding.'''
return self._gather('rx_index')
@property
def used_loops(self):
_all = np.append(self.tx_indices, self.rx_indices)
ret = np.unique(_all)
return ret
[docs] def addLoop(self, loop):
""" Appends a given loop instance to the loops in survey and returns id
"""
if isinstance(loop, (tuple, list, np.ndarray)):
idx = []
for lp in loop:
idx.append(self.addLoop(lp))
else:
self.loops = np.append(self.loops, loop)
idx = len(self.loops) - 1
if self.loop_config is not None:
self.loops[-1].config = self.loop_config
else:
if self.loops[-1] is not None:
self.loops[-1].setFrequency(self.earth.larmor)
return idx
[docs] def loadLoopMesh(self, savename, indices=None, dipolename=None):
""" Loads mesh and distribute reference to given indices.
"""
# shortcut for automatic call
if savename is None:
return
# all indices
if indices is None:
indices = np.arange(0, len(self.loops))
else:
indices = np.atleast_1d(indices)
# do for all indices
for idx in indices:
loop = self.loops[idx]
if savename not in self.loaded_meshes:
try:
ph.log.info('load loopmesh "{}"...'.format(savename))
self.loaded_meshes[savename] = pg.Mesh(savename)
except RuntimeError as err:
ph.log.warning('Did not found loopmesh at "{}"'
.format(savename))
raise err
ph.log.log(16, f'set loopmesh "{savename}" to loop {loop!r}...')
loop.setLoopMesh(self.loaded_meshes[savename], savename=savename)
if dipolename is not None:
if dipolename not in self.loaded_meshes:
try:
ph.log.info('load dipole mesh "{}"...'
.format(dipolename))
self.loaded_meshes[dipolename] = pg.Mesh(dipolename)
except RuntimeError as err:
ph.log.warning('Did not found loopmesh at "{}"'
.format(savename))
raise err
loop.setDipoleMesh(self.loaded_meshes[dipolename],
savename=dipolename)
[docs] def addSounding(self, fid):
""" Appends a given sounding instance to the sounds in survey and
returns id
"""
self.fids.append(fid)
idx = len(self.fids) - 1
return idx
[docs] def createSounding(self, tx=0, rx=0, check_double=True):
""" Creates a new sounding based on the given ids for tx and rx.
Parameters
----------
tx: integer [ 0 ]
Index of the transmitter loop in loops.
rx: integer [ 0 ]
Index of the receiver loop in loops. Same number than tx indicates
a coincident measurement.
check_double: boolean [ True ]
If True, omits creating another instance of the same fid (tx/rx
combination). Instead the index of the original fid is returned.
If False new fid is created and its index is returned.
Note: tx and rx indices can be setted regardless if there is an actual
loop in loops or just a *None* placeholder. In other words you can
create your soundings and loops in arbitrary order.
"""
found = np.logical_and(np.array(self.tx_indices) == tx,
np.array(self.rx_indices) == rx)
if check_double and np.any(found):
idx = np.where(found)[0][0]
else:
fid = FID(tx=tx, rx=rx)
idx = self.addSounding(fid)
return idx
[docs] def createKernel(self, fid=0, dimension=1):
""" Returns a initialized kernel instance for the chosen sounding.
Parameters
----------
sound_index: integer
Index of the sounding the kernelclass is calcualting the kernel
for. In order to calculate the kernel, pulses, tx and rx are taken
as references from the sounding.
Note: createKernel does not set or change any values in survey nor in
the corresponding sounding. However when calculating, the kernel class
will override the frequency in the given loops (tx and rx) and set it
to the larmor frequency calculated from the earth magnetic fields
magnitude. Use the *setEarth* method before or after you generate the
kernel instances, but obviously before calculation.
"""
from comet.snmr import kernel
kern = kernel.Kernel(self, fid=fid, dimension=dimension)
return kern
[docs] def setEarth(self, earth=None, incl=60., decl=2.,
mag=48000*1e-9, rad=False):
""" Defines the Earth in terms of inclination, declination and mag.
Parameters
----------
earth: comet.snmr.survey.Earth [ None ]
Already initialized earth class will be setted. Or created through
the other optional arguments.
inclination: float [ 60. ]
Inclination of the earth magnetic field in rad or degree.
declination: float [ 2. ]
Declination of the earth magnetic field in rad or degree.
magnitude: float [48000 * 1e-9]
Magnitude of the earth magnetic field in Tesla.
rad: boolean [ False ]
Input inclination and declination in rad?
"""
if earth is not None:
# simple setter...
self.earth = earth
else:
# ...or create.
self.earth = Earth(incl=incl,
decl=decl,
mag=mag,
rad=rad)
if self.loop_config is not None:
self.loop_config.f = self.earth.larmor
for loop in self.loops:
if loop is not None:
loop.setFrequency(self.earth.larmor)
ph.log.debug('survey.setEarth(): update loop frequencies')
ph.log.debug('survey.setEarth():\n{}'.format(self.earth))
[docs] def setLoopConfig(self, config, update_loop_configs=True):
"""Loop config in terms of primary field resistivity and frequency. """
self.loop_config = config
for loop in self.loops:
if loop is not None:
loop.config = config
[docs] def setCustemConfig(self, config, update_loop_configs=True):
self.custem_config = config
for loop in self.loops:
if loop is not None:
loop.secondary_config = self.custem_config
[docs] def setLoops(self, loops):
toset = np.atleast_1d(loops)
self.loops = toset
[docs] def set1DModel(self, thk=[], res=[1000.]):
"""Modifies loop config in terms of primary field resistivity. """
for loop in self.loops:
loop.setModel(res, d=thk)
[docs] def setResponse(self, array):
""" Set a response array from e.g. an inversion as data set for
plotting.
"""
response = np.copy(array)
resp_size = int(response.size)
mult = 1
data_size = 0
for dat in self.data:
data_size += np.prod(dat.shape)
if np.isclose(resp_size, data_size):
pass
elif np.isclose(resp_size, 2 * data_size):
mult = 2
else:
raise Exception(
'Response shape mismatch. Got {}, expected a vector that can '
'be reshaped to {}.'.format(np.shape(response),
data_size))
last = 0
for fid in self.fids:
# response = resp1.real, resp1.imag, resp2.real, ..., respN.imag
# mult => 2
# or response = complex(resp1), complex(resp2), ..., complex(respN)
# mult => 1
vec = response[last: last + fid.data_gated.size * mult]
fid.setResponse(vec)
last += fid.data_gated.size * mult
[docs] def loadMRSD(self, filename, remove_df=True,
build_loops=False, x_offsets=None, segments=80,
max_length=None, tx=None, rx=None, fids=None, debug=False):
"""
Parameters
----------
filename: string
Path to .mrsd file to be imported.
build_loops: boolean [ True ]
If True, the saved config in the mrsd file is used to construct
loops for transmitter and receiver. However, the information
in the mrsd fiel is not complete. There are some defaults we
assume in autogenerating the loops, especially when it comes
to figure-of-eight loops. Feel free to replace the loops
with custom created loops of the *pyhed* library. Or switch this
off if you only want to see the data or define all the loops
yourself.
x_offsets: list or None [ None ]
One information that is missing in mrsd files, is the relative
position of the loops to each other. Here one can fill in this
information giving a simple list of offsets in positive x
direction (all loops (midpoints) are placed at y=0 and z=0).
Expect one float per used loop by the data file or raises an
error. Ignored if None and multiple loops are found
(in this case no loops are build at all).
Coincident measurements do not require this, x is set to 0 by
default.
segments: integer [ 80 ]
Number of dipoles used to auto build the loops.
Ignored if *build_loops* is False or not given any *x_offsets*.
max_length: float [ None ]
Maximum length of a dipole when auto generating the loops.
Overrides segments.
Ignored if *build_loops* is False or not given any *x_offsets*.
"""
try:
self.loadMRSD_mat(
filename, remove_df=remove_df, build_loops=build_loops,
x_offsets=x_offsets, segments=segments, max_length=max_length,
tx=tx, rx=rx, fids=fids, debug=debug)
except NotImplementedError:
self.loadMRSD_h5(
filename, remove_df=remove_df, build_loops=build_loops,
x_offsets=x_offsets, segments=segments, max_length=max_length,
tx=tx, rx=rx, fids=fids, debug=debug)
[docs] def loadMRSD_mat(self, filename, remove_df=True,
build_loops=False, x_offsets=None, segments=80,
max_length=None, tx=None, rx=None, fids=None, debug=False,
):
""" See loadMRSD instead. """
ph.log.debug('load MRSD_mat')
ph.log.info('load MRSD: {}'.format(os.path.abspath(filename)))
# open mrsd (matlab) file and access log of processing steps
pl = loadmat(filename, struct_as_record=False,
squeeze_me=True)['proclog']
# sig has 4 entries: [noise(not gmr)][FID][T1/T2][T1/T2]
# as we are only using FID, num = 1:
num = 1
rx0 = np.atleast_1d(pl.Q[0].rx) # 0: first pulse, for general info
# number of loops in this class so far
numloops = len(self.loops)
# number of soudings in data file
number_fids = len(rx0)
if fids is None:
fids = np.arange(number_fids)
else:
fids = np.atleast_1d(fids)
# information about receiver (index, shape, size)
rx_info = np.atleast_1d(pl.rxinfo)
# auto generate loops from type and size stored in mrsd
loop_cfg = np.zeros((len(fids) + 1, 3), dtype=int)
loop_cfg[0, :] = [pl.txinfo.channel, pl.txinfo.looptype,
pl.txinfo.loopsize]
if tx is not None:
loop_cfg[0, 0] = tx
else:
loop_cfg[0, 0] = numloops
if rx is not None:
rx = np.atleast_1d(rx)
maxfid = max(fids)
if maxfid >= len(rx_info):
raise Exception('Sounding {} not available, found only {} '
'soundings in total (max index = {})'
.format(maxfid, len(rx_info), len(rx_info) - 1))
for i, fidi in enumerate(fids):
rxi = rx_info[fidi]
if rx is not None:
# set rx channel from existing loops
rxichannel = rx[i]
else:
rxichannel = rxi.channel - 1 + numloops # matlab to python
loop_cfg[i + 1, :] = [rxichannel, rxi.looptype,
rxi.loopsize]
# find unique loop indices to append items in self.loops
unique, rev_idx = np.unique(loop_cfg[:, 0], return_inverse=True)
num_unique = len(unique)
if build_loops:
if x_offsets is None or len(x_offsets) != num_unique:
build_loops = False
pr = 0 if x_offsets is None else len(x_offsets)
ph.log.warning('Warning: Wrong number of offsets ({}) for {} '
'loops. Sets loops to None and continue.'
.format(pr, num_unique))
# preserve place for loops if new indices are detected
max_loop_index = max(unique)
new = max_loop_index + 1 - len(self.loops)
if new > 0:
for index in range(new):
self.addLoop(None)
# new indices in site container
if tx is None and rx is None:
# shift due to existing loops, start at first new loop
site_idx = np.arange(numloops, numloops + num_unique)
else:
# given indices used for existing loops, start at 0
site_idx = np.arange(0, len(self.loops))
if build_loops:
# auto generate loop discretizations
for rev in range(num_unique):
self.loops[site_idx[rev]] =\
createLoopFromMRS(
loop_cfg[rev, 1], loop_cfg[rev, 2],
x_offsets[rev],
segments=segments,
max_length=max_length)
for nrxi, nrx in enumerate(fids):
# create empty FID class for the combination of tx and rx
# translate local indices from mrsd into global indices in class
ph.log.log(13, 'load sounding {}'.format(nrx))
fid = FID(tx=loop_cfg[0, 0],
rx=loop_cfg[nrxi + 1, 0])
# loop turns
fid.tx_turns = pl.txinfo.loopturns
fid.rx_turns = rx_info[nrx].loopturns
# set pulse moments
fid.setPulses(np.array([q.q for q in pl.Q]))
# times (not gated)
raw_times = rx0[nrx].sig[num].t
# ungated data, init array
raw_data = np.zeros(
(len(fid.pulses), len(raw_times)),
dtype=type(np.atleast_1d(pl.Q[0].rx)[nrx].sig[num].V[0]))
# ungated error, init array
raw_error = np.zeros(
(len(fid.pulses), len(raw_times)),
dtype=type(np.atleast_1d(pl.Q[0].rx)[nrx].sig[num].E[0]))
# frequency offset, init array
fid.df = np.zeros_like(fid.pulses)
# phases, init array
phi = np.zeros_like(fid.pulses)
# fill data, phases, frequency offsets and errors
# one df and phi per pulse moment !
for pulse in range(raw_data.shape[0]):
rx = np.atleast_1d(pl.Q[pulse].rx)[nrx]
raw_data[pulse] = rx.sig[num].V
raw_error[pulse] = rx.sig[num].E
fid.df[pulse] = rx.sig[num].fit[2]
phi[pulse] = rx.sig[num].fit[3]
fid.setPhases(phi)
# find certain times that shoudn't be imported.
# function copied from MRSmatlab, tested, same results
minmaxt = pl.event[(pl.event[:, 0] == 101.) & # index
(pl.event[:, 1] == 1) & #
(pl.event[:, 2] == 0) & #
(pl.event[:, 3] == 1) & #
(pl.event[:, 4] == 2), 5:7] # times: min, max
try:
mint_ind = np.argmin(np.abs(raw_times - minmaxt[0][0]))
except IndexError:
mint_ind = 0
try:
maxt_ind = np.argmin(np.abs(raw_times - minmaxt[0][1]))
except IndexError:
maxt_ind = len(raw_times) - 1
# shortens the time, data and error vector/data_gated
raw_times = np.array(raw_times[mint_ind:maxt_ind + 1])
with ph.misc.temporal_printoptions():
ph.log.debug('raw times: {} ({})'.format(raw_times,
len(raw_times)))
raw_data = raw_data[:, mint_ind:maxt_ind + 1]
with ph.misc.temporal_printoptions():
ph.log.debug('raw data: {}'.format(np.shape(raw_data)))
raw_error = raw_error[:, mint_ind:maxt_ind + 1]
# all pulses have the same pulse duration
fid.setPulseDuration(pl.Q[nrx].timing.tau_p1,
deadtime_device=pl.Q[0].timing.tau_dead1)
# sets ungated and unrotated data in the FID
# together with the corresponding original times
fid.setRawDataErrorAndTimes(raw_data, raw_error, raw_times)
# apply some default gating
# can be set manually again as many times as the user like
fid.gating()
self.addSounding(fid)
[docs] def loadMRSD_h5(self, filename, remove_df=True,
build_loops=False, x_offsets=None, segments=80,
max_length=None, tx=None, rx=None, fids=None, debug=False):
""" See loadMRSD instead. """
import h5py
ph.log.info('load MRSD: {}'.format(os.path.abspath(filename)))
# open mrsd (matlab) file and access log of processing steps
mat = h5py.File(filename, mode='r')
pl = mat['proclog']
# sig has 4 entries: [noise(not gmr)][FID][T1/T2][T1/T2]
# as we are only using FID, num = 1:
num = 1
# 0: first pulse, for general info
rx0 = np.atleast_1d(pl['Q']['rx'][0])
# number of loops in this class so far
numloops = len(self.loops)
# number of soudings in data file
number_fids = len(rx0)
if fids is None:
fids = np.arange(number_fids)
else:
fids = np.atleast_1d(fids)
# information about receiver (index, shape, size)
rx_info = pl['rxinfo']
tx_info = pl['txinfo']
# auto generate loops from type and size stored in mrsd
loop_cfg = np.zeros((len(fids) + 1, 3), dtype=int)
loop_cfg[0, :] = [tx_info['channel'][0, 0], tx_info['looptype'][0, 0],
tx_info['loopsize'][0, 0]]
if tx is not None:
loop_cfg[0, 0] = tx
if rx is not None:
rx = np.atleast_1d(rx)
for i, rxi in enumerate(fids): # range(len(rx_info['channel'])):
if rx is not None:
# set rx channel from existing loops
rxichannel = rx[i]
else:
rxichannel = rx_info['channel'][rxi, 0]
loop_cfg[i + 1, :] = [rxichannel,
rx_info['looptype'][rxi, 0],
rx_info['loopsize'][rxi, 0]]
# find unique loop indices to append items in self.loops
unique, rev_idx = np.unique(loop_cfg[:, 0], return_inverse=True)
num_unique = len(unique)
print(loop_cfg, unique)
if build_loops:
if x_offsets is None or len(x_offsets) != num_unique:
build_loops = False
pr = 0 if x_offsets is None else len(x_offsets)
ph.log.warning('Warning: Wrong number of offsets ({}) for {} '
'loops. Sets loops to None and continue.'
.format(pr, num_unique))
# preserve place for loops if new indices are detected
max_loop_index = max(unique)
new = max_loop_index + 1 - len(self.loops)
if new > 0:
for index in range(new):
self.addLoop(None)
# new indices in site container
if tx is None and rx is None:
# shift due to existing loops, start at first new loop
site_idx = np.arange(numloops, numloops + num_unique)
else:
# given indices used for existing loops, start at 0
site_idx = np.arange(0, len(self.loops))
if build_loops:
# auto generate loop discretizations
for rev in range(num_unique):
self.loops[site_idx[rev]] =\
createLoopFromMRS(
loop_cfg[rev, 1], loop_cfg[rev, 2],
x_offsets[rev],
segments=segments,
max_length=max_length)
rxs = ph.misc.getAllValuesByReference(mat, pl['Q']['rx'])
for nrxi, nrx in enumerate(fids):
# create empty FID class for the combination of tx and rx
# translate local indices from mrsd into global indices in class
ph.log.log(13, 'load sounding {}'.format(nrx))
fid = FID(tx=loop_cfg[0, 0],
rx=loop_cfg[nrxi + 1, 0])
# loop turns
fid.tx_turns = int(tx_info['loopturns'][0, 0])
fid.rx_turns = int(rx_info['loopturns'][nrx, 0])
# set pulse moments
pulses = ph.misc.getAllValuesByReference(mat, pl['Q']['q'])
fid.setPulses(pulses)
# times (not gated)
raw_times = ph.misc.getAllValuesByReference(
mat, rxs[0]['sig']['t'][num][nrx])[:, 0]
volts = ph.misc.getAllValuesByReference(
mat, rxs[0]['sig']['V'][num][nrx])[:, 0]
errors = ph.misc.getAllValuesByReference(
mat, rxs[0]['sig']['E'][num][nrx])[:, 0]
# and people say hdf5 is so nice...
# ungated data, init array
if volts.dtype == np.dtype([('real', '<f8'), ('imag', '<f8')]):
dtype0 = np.complex128
else:
dtype0 = np.float64
if errors.dtype == np.dtype([('real', '<f8'), ('imag', '<f8')]):
dtype1 = np.complex128
else:
dtype1 = np.float64
raw_data = np.zeros(
(len(fid.pulses), len(raw_times)),
dtype=dtype0)
# ungated error, init array
raw_error = np.zeros(
(len(fid.pulses), len(raw_times)),
dtype=dtype1)
# frequency offset, init array
fid.df = np.zeros_like(fid.pulses)
# phases, init array
phi = np.zeros_like(fid.pulses)
# fill data, phases, frequency offsets and errors
# one df and phi per pulse moment !
for pulse in range(raw_data.shape[0]):
# volts = ph.misc.getAllValuesByReference(
# mat, rxs[pulse]['sig']['V'][num][0])[:, 0]
volts = ph.misc.getAllValuesByReference(
mat, rxs[pulse]['sig']['V'][num][nrx])[:, 0]
if dtype0 == np.complex128:
raw_data[pulse] = volts['real'] + 1j * volts['imag']
else:
raw_data[pulse] = volts
errors = ph.misc.getAllValuesByReference(
mat, rxs[pulse]['sig']['E'][num][nrx])[:, 0]
if dtype0 == np.complex128:
raw_error[pulse] = errors['real'] + 1j * errors['imag']
else:
raw_error[pulse] = errors
fit = ph.misc.getAllValuesByReference(
mat, rxs[pulse]['sig']['fit'][num][nrx])[:, 0]
fid.df[pulse] = fit[2]
phi[pulse] = fit[3]
fid.setPhases(phi)
# find certain times that shoudn't be imported.
# function copied from MRSmatlab, tested, same results
ev = np.array(pl['event'])
minmaxt = ev[5:7, (ev[0] == 101.) & # index
(ev[1] == 1) & #
(ev[2] == 0) & #
(ev[3] == 1) & #
(ev[4] == 2)] # times: min, max
try:
mint_ind = np.argmin(np.abs(raw_times - minmaxt[0][0]))
except IndexError:
mint_ind = 0
try:
maxt_ind = np.argmin(np.abs(raw_times - minmaxt[0][1]))
except IndexError:
maxt_ind = len(raw_times) - 1
# shortens the time, data and error vector/data_gated
raw_times = np.array(raw_times[mint_ind:maxt_ind + 1])
raw_data = raw_data[:, mint_ind:maxt_ind + 1]
raw_error = raw_error[:, mint_ind:maxt_ind + 1]
# all pulses have the same pulse duration
timing = ph.misc.getAllValuesByReference(
mat, pl['Q']['timing'])[nrx]
fid.setPulseDuration(timing['tau_p1'][0, 0],
deadtime_device=timing['tau_dead1'][0, 0])
# sets ungated and unrotated data in the FID
# together with the corresponding original times
fid.setRawDataErrorAndTimes(raw_data, raw_error, raw_times,
debug=debug)
# apply some default gating
# can be set manually again as many times as the user like
fid.gating()
self.addSounding(fid)
[docs] def loadMRSK(self, filename, tx=None, rx=None, fid=None, set_earth=True,
distribute_loop_config=False, x_offsets=None, segments=80,
max_length=None, deadtime_device=0.005, min_thk=0,
verbose=True, set_df=False):
try:
kern = self._loadMRSK_mat(
filename, tx=tx, rx=rx, fid=fid, set_earth=set_earth,
distribute_loop_config=distribute_loop_config,
x_offsets=x_offsets, segments=segments,
max_length=max_length, deadtime_device=deadtime_device,
min_thk=min_thk, set_df=set_df, verbose=verbose)
except NotImplementedError:
# new matlab files are h5 files
kern = self._loadMRSK_h5(
filename, tx=tx, rx=rx, fid=fid, set_earth=set_earth,
distribute_loop_config=distribute_loop_config,
x_offsets=x_offsets, segments=segments,
max_length=max_length, deadtime_device=deadtime_device,
min_thk=min_thk, set_df=set_df, verbose=verbose)
return kern
def _loadMRSK_mat(
self, filename, tx=None, rx=None, fid=None, set_earth=True,
distribute_loop_config=False, x_offsets=None, segments=80,
max_length=None, deadtime_device=0.005, min_thk=0, set_df=False,
verbose=True):
"""
Load .mrsk file and returns kernel class instance with the same
parameters.
"""
# load matlab struct
ph.log.info('load MRSK: {}'.format(os.path.abspath(filename)))
MAT = loadmat(filename, struct_as_record=False,
squeeze_me=True)
kdata = MAT['kdata']
measure = kdata.measure
loop = kdata.loop
if kdata.measure.pulsesequence not in (1, 'FID'):
raise Exception('Cannot load other pulsesequences than FIDs.')
if set_earth:
self.setEarth(incl=float(kdata.earth.inkl),
decl=float(kdata.earth.decl),
mag=float(np.abs(kdata.earth.erdt)))
# add sounding and get id
if fid is None:
if rx is None:
if tx is None:
tx = 0
rx = tx
fid = FID(tx=tx, rx=rx)
sound_id = self.addSounding(fid)
else:
sound_id = int(fid)
fid = self.fids[fid]
rx = fid.rx_index
tx = fid.tx_index
# add loop and get id
if tx is None:
if x_offsets is None:
tx = self.addLoop(None)
print('Warning: Please provide *loadMRSK* with tx and rx '
'indices or an offsets for automatic loop creation.'
' Appended placeholder loop at index {} and continue.'
.format(tx))
else:
tx = self.addLoop(createLoopFromMRS(
kdata.loop.shape, kdata.loop.size,
np.atleast_1d(x_offsets)[0],
segments=segments, max_length=max_length))
fid.tx_turns = int(loop.turns[0])
fid.rx_turns = int(loop.turns[1])
fid.setPulses(measure.pm_vec)
fid.setPulseDuration(
measure.taup1,
deadtime_device=deadtime_device)
if set_df:
ph.log.debug('set frequency offset')
fid.setFrequencyOffset(measure.df)
else:
ph.log.debug('did NOT set frequency offset')
fid.temperature = kdata.earth.temp
if kdata.earth.res == 1:
# case 1/3: resistive
rho = np.array([10000.])
thk = []
else:
rho = 1./np.atleast_1d(kdata.earth.sm)
if len(np.atleast_1d(kdata.earth.sm)) <= 2:
# case 2/3: halfspace
thk = np.atleast_1d(kdata.earth.zm)
else:
# case 3/3: layered halfspace: depths -> thicknesses
thk = np.append(kdata.earth.zm[0], np.diff(kdata.earth.zm))
try:
if tx is not None and self.loops[tx] is not None:
self.loops[tx].setModel(rho, d=thk)
except IndexError:
pass
try:
if rx is not None and self.loops[rx] is not None:
self.loops[rx].setModel(rho, d=thk)
except IndexError:
pass
kern = self.createKernel(fid=sound_id,
dimension=int(kdata.measure.dim))
kern.K = np.conjugate(kdata.K)
zvec = np.append(np.zeros(1), -np.cumsum(kdata.model.Dz))
kern.setZVector(zvec, min_thk=min_thk)
return kern
def _loadMRSK_h5(
self, filename, tx=None, rx=None, fid=None, set_earth=True,
distribute_loop_config=False, x_offsets=None, segments=80,
max_length=None, deadtime_device=0.005, min_thk=0, set_df=False,
verbose=True):
"""
Load .mrsk file and returns kernel class instance with the same
parameters.
"""
import h5py
# load matlab struct
ph.log.info('load file: {}'.format(os.path.abspath(filename)))
MAT = h5py.File(filename, mode='r')
kdata = MAT['kdata']
# Dimension
D = int(kdata['measure']['dim'][()].squeeze())
# Earth
earth = kdata['earth']
measure = kdata['measure']
loop = kdata['loop']
if measure['pulsesequence'][()].flat[0] != 1:
raise Exception('Cannot load other pulsesequences than FIDs.')
if set_earth:
self.setEarth(incl=float(earth['inkl'][()].flat[0]),
decl=float(earth['decl'][()].flat[0]),
mag=float(np.abs(earth['erdt'][()].flat[0])),
rad=False)
# add sounding and get id
rx = rx or tx # None -> tx
if fid is None:
fid = FID(tx=tx, rx=rx)
sound_id = self.addSounding(fid)
else:
sound_id = int(fid)
fid = self.fids[fid]
rx = fid.rx_index
tx = fid.tx_index
# add loop and get id
if tx is None:
if x_offsets is None:
tx = self.addLoop(None)
print('Warning: Please provide *loadMRSK* with tx and rx '
'indices or an offsets for automatic loop creation.'
' Appended placeholder loop at index {} and continue.'
.format(tx))
else:
tx = self.addLoop(createLoopFromMRS(
loop['shape'][()].flat[0], loop['size'][()].flat[0],
np.atleast_1d(x_offsets)[0],
segments=segments, max_length=max_length))
fid.tx_turns = int(loop['turns'][()].flat[0])
fid.rx_turns = int(loop['turns'][()].flat[1])
fid.setPulses(measure['pm_vec'][()].squeeze())
fid.setPulseDuration(
measure['taup1'][()].flat[0],
deadtime_device=deadtime_device)
if set_df:
ph.log.debug('set frequency offset')
fid.setFrequencyOffset(measure['df'][()].flat[0])
else:
ph.log.debug('DID NOT set frequency offset')
fid.temperature = earth['temp'][()].flat[0]
if earth['res'][()].flat[0] == 1:
# case 1/3: resistive
rho = np.array([10000.])
thk = []
else:
sm = np.atleast_1d(earth['sm'][()].squeeze())
zm = np.atleast_1d(earth['zm'][()].squeeze())
rho = 1./np.atleast_1d(sm)
if len(np.atleast_1d(sm)) < 2:
# case 2/3: halfspace
thk = np.atleast_1d(zm)
else:
# case 3/3: layered halfspace: depths -> thicknesses
thk = np.append(zm[0], np.diff(zm))
if self.loops[tx] is not None:
self.loops[tx].setModel(rho, d=thk)
if self.loops[rx] is not None:
self.loops[rx].setModel(rho, d=thk)
kern = self.createKernel(fid=sound_id,
dimension=int(D))
kraw = kdata['K'][()].squeeze()
# the minus sign is a convention thing
kern.K = kraw['real'][()].squeeze().T + \
1j * kraw['imag'][()].squeeze().T
zvec = np.append(np.zeros(1),
-np.cumsum(kdata['model']['Dz'][()]))
kern.setZVector(zvec, min_thk=min_thk)
return kern
[docs] def save(self, savename, save_loops=True, use_original_loop_names=False):
savename = Path(savename).with_suffix('')
ph.log.info('Survey.save, {} loops, {} fids, name: {}'
.format(len(self.loops), len(self.fids),
savename.with_suffix('.npz').as_posix()))
# loops
loop_names = []
if save_loops:
orig_names =\
[None if loop is None else loop.name for loop in self.loops]
use_orig = use_original_loop_names
if None in self.loops or\
None in orig_names or\
len(np.unique(orig_names)) != len(self.loops):
if use_original_loop_names:
ph.log.warning('survey.save: Loop names not unique, '
'create new names.')
use_orig = False
if use_orig:
for lp, loop in enumerate(self.loops):
path = savename.with_name(
Path(orig_names[lp]).name).as_posix()
loop_names.append(path)
loop.save(path)
else:
for lp, loop in enumerate(self.loops):
if loop is not None:
temp_name = '{}_loop_{}'.format(savename.stem, lp)
loop_names.append(
savename.with_name(temp_name).as_posix())
loop.save(savename.with_name(temp_name).as_posix())
else:
loop_names.append(None)
# fids
fid_names = []
for fd, fid in enumerate(self.fids):
temp_name2 = '{}_fid_{}.npz'.format(savename.stem, fd)
fid_names.append(temp_name2)
fid.save(savename.with_name(temp_name2).as_posix())
# earth + remember names
ph.log.debug(f'Survey.save: loop_names = {loop_names}')
np.savez(savename.with_suffix('.npz').as_posix(),
mag=self.earth.magnitude,
incl=self.earth.inclination,
decl=self.earth.declination,
loop_names=loop_names,
fid_names=fid_names)
[docs] def load(self, savename, load_meshes=True, load_loops=True):
from comet.pyhed.IO import getItem
savename = Path(savename)
dir_abs = savename.parent.absolute()
ph.log.info('Survey.load(): "{}"'.format(savename.as_posix()))
with np.load(savename.with_suffix('.npz').as_posix(),
allow_pickle=True) as npz:
self.setEarth(mag=getItem(npz, 'mag'),
decl=getItem(npz, 'decl'),
incl=getItem(npz, 'incl'),
rad=True)
ph.log.log(16, self.earth)
off_loops = len(self.loops)
loop_names = np.atleast_1d(getItem(npz, 'loop_names'))
ph.log.info('Survey.load(): found {} loops'
.format(len(loop_names)))
ph.log.debug(f'Survey.load(): loop names {loop_names}')
for name in loop_names:
if name is None or load_loops is False:
# case 1/4: no name found
# case 4/4: no loading of loops per flag
temp = None
elif isinstance(name, str):
name = Path(name)
pabs = name.parent.absolute()
if pabs != dir_abs:
# case 2/4: only file name or partial relative path
# need to get path from survey
temp = ph.loop.loadLoop(
Path(dir_abs).joinpath(name.stem).as_posix(),
load_meshes=False, overwrite_dir=True)
else:
# case 3/4: absolute path
# with subfolders provided
temp = ph.loop.loadLoop(name.as_posix(),
load_meshes=False)
# in any case add loop or None to list of self.loops
self.addLoop(temp)
# loading meshes through this enables caching
if load_meshes and temp is not None:
self.loadLoopMesh(temp.loop_mesh_name, -1,
dipolename=temp.dipole_mesh_name)
if len(self.loops) > 0 and self.loops[0] is not None:
self.setCustemConfig(self.loops[0].secondary_config,
update_loop_configs=False)
fid_names = np.atleast_1d(getItem(npz, 'fid_names'))
ph.log.info('Survey.load(): found {} fids'.format(len(fid_names)))
for name2 in fid_names:
temp2 = FID()
temp2.load(
savename.parent.joinpath(name2).as_posix())
# shift if loaded npz is added to non empty survey
temp2.setTx(temp2.tx_index + off_loops)
temp2.setRx(temp2.rx_index + off_loops)
self.addSounding(temp2)
[docs] def createMRS(self, fid=0, kernel=None, mtype='smooth', dtype='complex',
nlay=3, lam=1000, dimension=2, **kwargs):
from comet.snmr.modelling import MRS
mrs = MRS(survey=self,
fid=fid,
mtype=mtype,
dtype=dtype,
kernel=kernel, # is created if None
dimension=dimension,
**kwargs)
return mrs
def _gather(self, attribute):
""" Internal function.
Gather variables from underlaying soundings for convenience. """
ret = []
for fid in self.fids:
ret.append(getattr(fid, attribute))
return ret
[docs]class FID(object):
""" Single SNMR experiment (sounding) using a simple
Free Induction Decay (FID).
Attributes to be setted directly:
"""
def __init__(self, tx=0, rx=0, pulses=None):
# pulse moment vector [As]
# default pulses from MRSmatlab (convenient for synthetic stuff)
default_pulses =\
np.array((278, 320, 436, 540, 675, 830, 1049, 1250,
1460, 1760, 2106, 2458, 2925, 3391, 4004, 4677, 5473,
6273, 7155, 8148, 9281, 10515, 11926, 13556),
np.float64) / 1000.
self.setPulses(pulses or default_pulses)
# times [s], used for gating (not needed if gates are set manually)
self._times = None
# time gates midpoints [s]
self._gates = None
self._gates_thk = None
self._gating_last = 50
self._filter_last = [0., 2.]
# temperature [K]
self._temperature = None # init attribute for overview
self.temperature = 281 # fills _temperature
# also fills self.curie
# ca. 8 °C, middle temperature in germany ( ~ 10 meter depth)
# transmitter
self.tx_turns = None
self.tx_index = None
self.setTx(tx, turns=1)
# receiver
self.rx_turns = None
self.rx_index = None
self.setRx(rx, turns=1)
# general frequency offset [Hz]
self.df = None
self.setFrequencyOffset(0.0)
# phase per pulse moment
# Phase needed for rotated amplitude inversions, igored for complex
self.phi = None # shape = pulse moments
# phase of device (once), [-pi, pi]
# Phase needed for complex inversions, igored for rotated amplitudes
# (set to zero)
self.data_phase = 0.0
# data
self.data_raw = None # shape = pulse moments * times
self.data_gated = None # shape = pulse moments * gates
self.raw_rotated = False # rotated data vector flag, raw data
self.rotated = False # rotated data vector flag, gates only
# error
self.error_raw = None # shape = pulse moments * times
self.error_gated = None # shape = pulse moments * gates
self.error_weights = None
# response
self.response = None
# duration pulse 1 [s]
self.taup1 = None # init atttribute for overview
self.deadtime_device = None # -""-
self.setPulseDuration(0.04) # default 40 ms + 5 ms device
def __repr__(self):
return 'FID(Tx: {}, Rx: {})'.format(self.tx_index, self.rx_index)
@property
def deadtime(self):
""" Effective deadtime (device + half pulse) [s]."""
return self.taup1 / 2. + self.deadtime_device
@property
def pulses(self):
""" Pulse moment vector [As]. """
return self._pulses
@property
def amperes(self):
""" Ampere vector [A]. """
return self._pulses / self.taup1 # internally pulses=As -> A = As/s
@amperes.setter
def amperes(self, vec):
self.setPulses(vec * self.taup1) # A in -> set As = A * s
@property
def times(self):
""" Time vector [s] of raw data (including deadtime). """
if self._times is None:
return None
else:
return self._times + self.deadtime
@property
def gates(self):
""" Time gate midpoint vector [s] (including deadtime). """
if self._gates is None:
return None
else:
return self._gates + self.deadtime
@property
def temperature(self):
""" Middle temperature [K]. Default = 281 K (8°C or 46.4°F). """
return self._temperature
@temperature.setter
def temperature(self, temperature):
""" Expect temperature in Kelvin. """
self._temperature = temperature
self._curie = misc.constants.calcCurieFactor(self._temperature)
@property
def curie(self):
""" Curie factor for kernel calculation.
Read only. Calculated automatically by setting temperature.
"""
return self._curie
[docs] def setTx(self, index, turns=None):
""" Define index of transmitter loop and turns. """
self.tx_index = index
if turns is None:
if self.tx_turns is None:
self.tx_turns = 1
else:
self.tx_turns = turns
[docs] def setRx(self, index, turns=None):
""" Define index of receiver loop and turns. """
self.rx_index = index
if turns is None:
if self.tx_turns is None:
self.tx_turns = 1
else:
self.rx_turns = turns
[docs] def getComplexData(self):
# stich together real and imag part of data
# = also raveled, but twice the length!
if self.rotated:
ph.log.warning('getComplexData: rotatedAmplitudes used!')
ret = self.data_gated
else:
if not np.isclose(self.data_phase, 0):
# changes for rot of complex data by phase: 24.06.2020
ph.log.log(13, 'getComplexData: {:.2f}'.format(
self.data_phase))
rot_data = misc.rotData(self.data_gated,
self.data_phase)
ret = rot_data
else:
ph.log.log(13, 'getComplexData: no phase')
ret = self.data_gated
return ret
[docs] def getRotatedAmplitudes(self):
""" Returns Data and Error as real component of the rotated Vecs."""
if not self.rotated:
self.rotateAmplitudes()
data = self.data_gated.real.ravel() # no abs here !
error = self.error_gated.real.ravel()
return (data, error)
[docs] def setRawDataErrorAndTimes(self, data, error, times, rotated=False,
phases=None, remove_df=True,
omit_regating=False):
"""
Sets the raw (processed but ungated) data vector along with the time
discretization and errorvector.
Parameters
----------
data: np.ndarray
Data vector of shape (number of pulses, times). Expect complex
valued vector.
error: np.ndarray
Error vector of the same shape as the data vector.
times: np.ndarray
Simple time vector in seconds with shape matching the dimension 1
of the data and error vector, expect times without deadtime!
rotated: boolean [ False ]
Define whether the data are already rotated or not. There is no
autodetect for that.
phases: np.ndarray [ None ]
Define phases as simple vector containing phases in rad. Expect one
value per pulse.
remove_df: boolean [ True ]
Removes the frequency offset in the given data stored in the
attribute **df** [Hz].
omit_regating: boolean [ False ]
When setting the raw data, the gated data need to be recalculated.
By default this is done via regating with the original settings
for the gating.
Sets
----
This functionality fills the following attributes:
*data_raw*, *times*, *error_raw*, *raw_rotated*
and optionally:
*phi* (phases)
"""
if times[0] >= self.deadtime:
with ph.misc.temporal_printoptions(threshold=5):
ph.log.warning(
'First entry for given times ({}) is greater than '
'deadtime of {:.4f} sec. Expect given vector to not '
'include deadtime.'
.format(np.array(times), self.deadtime))
if np.shape(data) != np.shape(error):
raise Exception(
'Shape mismatch between data and error: {} != {}'
.format(np.shape(data), np.shape(error)))
if len(times) != np.shape(data)[1]:
raise Exception(
'Shape mismatch between data/error and times: {} != {}'
.format(len(times), np.shape(data)[1]))
self._times = times
self.data_raw = data
# fix frequency offset
if remove_df:
# check and remove this
ph.log.debug('todo: validate fix with df')
self.data_raw *= np.exp(-1j * 2. * np.pi * self.df[:, np.newaxis] *
self.times)
self.error_raw = error
self.raw_rotated = rotated
if phases is not None:
self.setPhases(phases)
if not omit_regating:
self._regating() # new raw data -> update gated data
[docs] def setGatedDataErrorAndGates(self, data, error, gates, rotated=False,
phases=None, midpoints=True):
"""
Sets the processed and gated data vector along with the gates (time
discretization) and error cube.
Parameters
----------
data: np.ndarray
Data vector of shape (number of pulses, number of gates). Expect
complex valued vector.
error: np.ndarray
Error vector of the same shape as the data vector.
gates: np.ndarray
Simple time vector in seconds with shape matching the dimension 1
of the data and error vector. Expect gates without deadtime.
rotated: boolean [ False ]
Define whether the data are already rotated or not. thee is no
autodetect for that.
phases: np.ndarray [ None ]
Define phases as simple vector containing phases in rad. Expect one
value per pulse.
midpoints: boolean [ True ]
If True (default) the given times in the gates vector are
interpreted as midpoint of gates. However if False the vector is
interpreted as outer limits of the gates, so gate 1 would be
defined between time 1 and time 2 and gate 2 between time 2 and 3
and so on.
Sets
----
This functionality fills the following attributes:
*data_gated*, *gates*, *error_gated*, *rotated*
and optionally:
*phi* (phases)
"""
if np.shape(data) != np.shape(error):
raise Exception(
'Shape mismatch between data and error: {} != {}'
.format(np.shape(data), np.shape(error)))
if len(gates) != np.shape(data)[1]:
raise Exception(
'Shape mismatch between data/error and gates: {} != {}'
.format(len(gates), np.shape(data)[1]))
self.setGates(gates, midpoints=midpoints)
self.data_gated = data
self.error_gated = error
self.rotated = rotated
if phases is not None:
self.setPhases(phases)
[docs] def setPhases(self, phi):
"""
Sets variable phi. No check for length if vector is done. See
setGatedDataErrorAndGates or setRawDataErrorAndTimes for more details.
"""
self.phi = phi
[docs] def setDataPhase(self, data_phase):
"""
Sets variable data_phase. Expect single float value for data phase in
rad.
"""
ph.log.log(13, 'setDataPhase: {:.2f}'.format(data_phase))
self.data_phase = data_phase
[docs] def setPulseDuration(self, taup, deadtime_device=0.005):
""" Sets pulse duration [s] and internal deadtime from the device.
Parameters
----------
taup: float
Pulse duration in seconds.
deadtime_device: float [ 0.005 ]
Internal deadtime of the measurement device in seconds.
0.005 seconds are default for synthetic studies.
Sets
----
*taup1*,
*deadtime_device*,
*deadtime* (half pulse + deadtime_device)
"""
self.taup1 = taup
self.deadtime_device = deadtime_device
[docs] def setFrequencyOffset(self, df):
""" Sets frequency offset of tx pulse to larmor frequency.
Expect one value per pulse or one single value (used for all pulses).
None is treated as zero offset (internal initialization).
"""
if df is None:
self.df = np.zeros(len(self.pulses))
if not isinstance(df, np.ndarray):
self.df = np.ones(len(self.pulses)) * df
else:
self.df = df
[docs] def setPulses(self, pulses):
""" Set pulse moment vector. Expect array with float in [As].
Sets
----
*pulses*
"""
self._pulses = np.atleast_1d(np.array(pulses, dtype=np.float))
[docs] def setGates(self, gates, midpoints=True):
""" Define time gates.
Parameters
----------
gates: np.ndarray
Define gates midpoints. Expect array with float in [s]. See
midpoints for definition of how the input array is interpreted.
midpoints: boolean [ True ]
If True (default) the given times in the gates vector are
interpreted as midpoint of gates. However if False the vector is
interpreted as outer limits of the gates, so gate 1 would be
defined between time 1 and time 2 and gate 2 between timne 2 and 3
and so on.
Sets
----
*gates* and *_gates_thk* if not the midpoints are given
"""
with ph.misc.temporal_printoptions():
ph.log.debug('setGates: {} ({})'.format(gates, len(gates)))
if gates[0] >= self.deadtime:
with ph.misc.temporal_printoptions(threshold=5):
ph.log.warning(
'First entry for given gates ({}) is greater than '
'deadtime of ({} sec). Expect given vector to not include'
' deadtime.'.format(np.array(gates), self.deadtime))
if midpoints is True:
self._gates = gates
else:
if not np.isclose(gates[0], 0.0):
ph.log.warning('Had to add 0 as first entry of given time '
'vector in order to calculate the midpoints '
'of the gates. If you want to set given times '
'directly as midpoints use midpoints=True '
'in the setGates call.')
gates = np.append([0.], gates)
self._gates_thk = np.diff(gates)
self._gates = gates[1:] - (self._gates_thk/2)
[docs] def setResponse(self, array):
""" Sets a respinse array with the same shape as the data e.g. from an
inversion instance. For plotting only.
"""
if np.isclose(array.size, 2 * self.data_gated.size):
real, imag = np.split(np.array(array).flatten(), 2)
array = real + 1j * imag
try:
self.response = array.reshape(self.data_gated.shape)
except ValueError as Ex:
ph.log.error('setResponse(): Wrong shape of input vector.')
raise Ex
[docs] def gating(self, num_gates=42, verbose=False):
"""
Explanation:
------------
(extracted from MRSMatlab, 2017)
y=exp(x)
For some interval x(a:b) the exact mean within exp(x(a:b))
yAverage = exp(mean(log(y(a:b))))
t(yAverage) = mean(t(a:b))
Problem: Logarithm is nice for exact average of exponential function.
But signals are noise contaminated. 1. Logarithm of gaussian noise
changes noise structure from gaussian to lorenzian. Averaging of
lorenzian distributed noise is not zero. 2. Since noise can make signal
negative a dc shift is added to make signals positive. This deminishes
the accurancy of averaging in logspace. For large constant shift
averaging in logspace becomes equivalent to average in linspace.
However this is nice for noise structure.
So we have a tradeoff.
Finally, from some amount of intervals on, e.g. 20 within interval
[0 1]/s averaging is sufficiently exact in any case.
MMP 18/10/2011
"""
if self.data_raw is None:
raise Exception('No raw data for gating available.')
# effective deadtime
dt = self.deadtime
assert not np.isclose(dt, 0), 'Gating: Deadtime cannot be zero.'
# raw data
signalBaseCorr = np.abs(10.0 * np.min(self.data_raw))
signal = self.data_raw + signalBaseCorr
error = self.error_raw
# _time = time vector - deadtime
time = np.round(self._times, decimals=10)
if np.isclose(time[0], 0):
logtime = np.abs(np.logspace(np.log10(time[1] + dt),
np.log10(time[-1] + time[1] + dt),
num_gates) - time[1]) - dt
else:
logtime = np.abs(np.logspace(np.log10(time[1]),
np.log10(time[-1] + time[1]),
num_gates) - time[1])
indextime = np.ones_like(logtime, dtype=int)
for i in range(len(logtime) - 1):
indextime[i] = np.where(logtime[i] < time)[0][0]
# ensures that timegates contain 1 data point and size is increasing
indextime = np.cumsum(np.sort(np.diff(np.unique(indextime[:-1]))))
indextime = np.hstack((0, indextime, len(time)))
if ph.log.getEffectiveLevel() <= 20:
with ph.misc.temporal_printoptions():
ph.log.info('gating: indices {}, for {} gates'
.format(indextime, len(indextime) - 1))
gateV = np.zeros((len(signal),
len(indextime) - 1), dtype=type(signal[0, 0]))
gateT = [] # mean time of gate i
gateL = [] # number of data points in gate i
for i in range(0, len(indextime) - 1):
gateV[:, i] = np.exp(np.mean(np.log(
signal[:, indextime[i]:indextime[i + 1]]),
axis=1))
hlp = time[indextime[i]:indextime[i + 1]]
gateT.append(np.mean(hlp))
gateL.append(len(hlp))
gateV -= signalBaseCorr
gateT = np.array(gateT, dtype=np.float64)
with ph.misc.temporal_printoptions():
ph.log.info('gating: midpoints: {} ({})'
.format(gateT, len(gateT)))
# set time gate midpoints
self.setGates(gateT)
# set data cube
self.error_weights = np.sqrt(np.array(gateL, dtype=float))
self.error_gated = np.tile(np.mean(error, axis=1),
(len(indextime) - 1, 1)).T
self.error_gated /= self.error_weights
# set data cube
self.data_gated = gateV
self.filterGates(mint=self._filter_last[0],
maxt=self._filter_last[1])
# set some flags at last
# are these data rotated ?
# move flag from raw data to gated data.
self.rotated = self.raw_rotated
# save input for automatical regating
self._gating_last = num_gates
def _regating(self):
""" Reapply gating and filter to get unrotated data again. """
self.gating(self._gating_last)
[docs] def setRotated(self, rotated, raw_data=False):
""" Sets rotation of data. True = rotatedAmplitudes,
False = complex.
"""
if self.rotated is rotated:
pass
else:
if rotated:
self.rotateAmplitudes(raw_data=raw_data)
else:
if self.raw_rotated:
raise Exception(
'Cannot unrotate, raw data already rotated')
else:
self._regating()
[docs] def rotateAmplitudes(self, raw_data=False):
"""
One of the three main ways for NMR forward modelling is to use
rotated amplitudes, instead of using the amplitudes of the complex
data or the complex data itself. If the phase information of the noise
free data is known (synthetic data) or fitted (e.g. monoexponential
fit) the rotated Amplitudes (also complex, do not confuse) have the
advantage of containing all the information in the real part (together
with noise), where the imaginary part contians only noise and can
therefore be discarded later.
Can be used on gated or ungated data, however this call alters the
raw_data!
Parameters
----------
raw_data: boolean [ True ]
Flag to decide if raw data or gated data are rotated.
Default is raw data, however if no raw data are
Returns
-------
complex rotated raveled data.
"""
if self.raw_rotated or (self.rotated and not raw_data):
raise Exception('Already rotated.')
if raw_data:
data = self.data_raw
# times = self.times
else:
data = self.data_gated
# times = self.gates
if data is None:
raise Exception('No data for calculation of rotated amplitudes.')
absolute = np.absolute(data)
phase = np.angle(data)
if np.shape(data) != np.shape(self.phi):
PHI = self.phi[:, np.newaxis]
else:
PHI = self.phi
if not isinstance(self.df, np.ndarray):
self.df = np.ones(len(self.phi)) * self.df
# this fixes offresonance!, only do this once
# removed 04.12.2018 and fixed df offset in import data!
# wdft = np.ones_like(data, dtype=np.float64) * 2. * np.pi
# wdft *= self.df[:, np.newaxis]
# wdft *= times[np.newaxis, :]
rotreal = absolute * np.cos(phase - PHI) # - wdft)
rotimag = absolute * np.sin(phase - PHI) # - wdft)
if raw_data:
self.data_raw = rotreal + 1j * rotimag
self.raw_rotated = True
else:
self.data_gated = rotreal + 1j * rotimag
self.rotated = True # raw data may be still unrotated
[docs] def filterGates(self, mint=0.0, maxt=2.0):
""" Dismiss not desired time gates.
Parameters
----------
mint: float [0.0]
Cut all data reqired before mint (in seconds). This is done using
the gate midpoints including deadtime.
maxt: float [2.0]
Cut all data reqired after maxt (in seconds). This is done using
the gate midpoints including deadtime.
Append new .gating to restore old gates
raw_data remain untouched)
"""
# sort out nonsense gate times
good = (self.gates <= maxt) & (self.gates >= mint)
ph.log.log(13, 'filterGates: min t, max t: ({:.2f}, {:.2f})'
.format(mint, maxt))
self.setGates(self._gates[good])
self.error_weights = self.error_weights[good]
self.data_gated = self.data_gated[:, good]
self.error_gated = self.error_gated[:, good]
self._filter_last = [mint, maxt]
[docs] def save(self, savename):
"""
Saves FID class instance under savename. Expect savename with ending
.npz (numpy compressed binary data structure).
"""
np.savez(savename,
data_raw=self.data_raw,
error_raw=self.error_raw,
data_gated=self.data_gated,
error_gated=self.error_gated,
error_weights=self.error_weights,
times=self._times, # without deadtime
phi=self.phi,
gates=self._gates, # without deadtime
gatesthk=self._gates_thk,
pulses=self.pulses,
rotated=self.rotated,
raw_rotated=self.raw_rotated,
txturns=self.tx_turns,
rxturns=self.rx_turns,
txidx=self.tx_index,
rxidx=self.rx_index,
lastgates=self._gating_last,
lastfilter=self._filter_last,
temp=self.temperature,
taup1=self.taup1,
deadtime_d=self.deadtime_device,
df=self.df,
data_phase=self.data_phase)
[docs] def load(self, savename, df_removed=True):
"""
Load previously saved FID class instance from savename (.npz)
(numpy compressed binary data structure).
Usually imported data are cleansed from frequency offsets (df) before
saving. However there is no auto detection for that. In rare cases (if
you know what youre doing) data are saved without removing df first.
Then df_removed has to be set to False. Otherwise the raw data
"""
from comet.pyhed.IO import getItem
with np.load(savename, allow_pickle=True) as npz:
# tx, rx
txi = getItem(npz, 'txidx')
txt = getItem(npz, 'txturns')
rxi = getItem(npz, 'rxidx')
rxt = getItem(npz, 'rxturns')
self.setTx(txi, turns=txt)
self.setRx(rxi, turns=rxt)
# pulses, pulse duration and deadtime
self.setPulses(getItem(npz, 'pulses'))
taup = getItem(npz, 'taup1')
dt_dev = getItem(npz, 'deadtime_d')
self.setPulseDuration(taup, deadtime_device=dt_dev)
# phases
phi = getItem(npz, 'phi')
self.data_phase = getItem(npz, 'data_phase', 0.0)
# raw data, error, and times
raw_data = getItem(npz, 'data_raw', None)
raw_error = getItem(npz, 'error_raw', None)
times = getItem(npz, 'times', None)
rotated_raw = getItem(npz, 'raw_rotated')
df = getItem(npz, 'df')
if df_removed:
self.setFrequencyOffset(0)
else:
self.setFrequencyOffset(df)
if raw_data is not None:
self.setRawDataErrorAndTimes(
raw_data, raw_error, times, rotated=rotated_raw,
phases=phi)
if df_removed:
self.setFrequencyOffset(df)
# gated data, error, and gates
self._gating_last = getItem(npz, 'lastgates')
ph.log.debug('survey.load: last gates: {}'
.format(self._gating_last))
self._filter_last = getItem(npz, 'lastfilter')
ph.log.debug('survey.load: last filter: {}'
.format(self._filter_last))
self.error_weights = getItem(npz, 'error_weights', None)
data_gated = getItem(npz, 'data_gated', None)
error_gated = getItem(npz, 'error_gated', None)
gates = getItem(npz, 'gates', None)
rotated = getItem(npz, 'rotated')
if data_gated is None:
if raw_data is not None:
self._regating()
else:
self.setGatedDataErrorAndGates(
data_gated, error_gated, gates, rotated=rotated,
phases=phi)
self._gates_thk = getItem(npz, 'gatesthk')
if self.error_weights is None and data_gated is not None:
ph.log.warning('supposed to have error_weights. '
'Regating fails. To be fixed.')
# self._regating()
[docs]class Earth(object):
"""
Parameters
----------
inclination: float [ 60. ]
Inclination of the earth magnetic field in rad or degree.
declination: float [ 2. ]
Declination of the earth magnetic field in rad or degree.
magnitude: float [48000 * 1e-9]
Magnitude of the earth magnetic field in Tesla.
rad: boolean [ False ]
Input inclination and declination in rad?
Example
-------
>>> from comet.snmr.survey import Earth
>>> e = Earth(inclination=45, declination=0, magnitude=4.8*1e-5)
>>> print(e)
"""
def __init__(self, incl=60., # 60°
decl=2., # 2°
mag=48000*1e-9,
rad=False):
self._larmor = None
self._magnitude = None
if rad:
self.inclination = incl
else:
self.inclination = np.deg2rad(incl)
if rad:
self.declination = decl
else:
self.declination = np.deg2rad(decl)
self.magnitude = mag
def __repr__(self):
return 'earth, incl: {:.1f}°, decl: {:.1f}°, mag: {:.0f} nT'.format(
np.rad2deg(self.inclination),
np.rad2deg(self.declination),
self.magnitude*1e9)
def __str__(self):
return ('### {1} ###{0}'
'Inclination: {2:.3f} rad ({3:.2f}°){0}'
'Declination: {4:.3f} rad ({5:.2f}°){0}'
'Magnitude: {6:.0f}e-9 Tesla ({7:.2f} Hz)'
.format(
'\n',
self.__class__.__name__.upper(),
self.inclination, np.rad2deg(self.inclination),
self.declination, np.rad2deg(self.declination),
self.magnitude*1e9, self.larmor))
[docs] def copy(self):
return Earth(inclination=self.inclination,
declination=self.declination,
magnitude=self.magnitude)
@property
def larmor(self):
return self._larmor
@larmor.setter
def larmor(self, larmor):
self._larmor = larmor
self._magnitude = self._Larmor2Mag()
@property
def magnitude(self):
return self._magnitude
@magnitude.setter
def magnitude(self, magnitude):
self._magnitude = magnitude
self._larmor = self._Mag2Larmor()
def _Mag2Larmor(self):
return misc.constants.gamma / (2 * np.pi) * self.magnitude
def _Larmor2Mag(self):
return self.larmor / misc.constants.gamma * 2 * np.pi
@property
def field(self):
""" Static magnetic field vector from earth defined in survey."""
incl = self.inclination
decl = self.declination
# Changed to B0-field definition after hertrich and braun, Nov, 2020
# original formulation: however their z points upwards!
# decl = self.declination
# B0 = np.array([np.cos(incl) * np.cos(-decl),
# np.cos(incl) * np.sin(-decl),
# np.sin(incl)])
# different definition for z: z negative downwards, which means:
# to remain a left-handed coordinate system which means the sign of
# the declination has to change as well
B0 = np.array([np.cos(incl) * np.cos(decl),
np.cos(incl) * np.sin(decl),
-np.sin(incl)])
return B0[:, np.newaxis]
[docs]def createLoopFromMRS(looptype, length, xoff, segments=80,
max_length=None, turns=1):
""" Returns a loop class object out of input found in a mrsd or mrsk file.
Parameters
----------
looptype: integer
Integer in [1, 2, 3, 4], in this range representing circular, square,
circular eight, and square eight loop source types. Error for looptype
< 1 and > 4.
length:
Length [m] of one side of the loop, or loop diameter for cicular type.
xoff: float
Offset [m] for loop midpoint in positive x direction.
segments: integer [ 80 ]
Number of segments used for discretization of the loop wire.
max_length: integer [ None ]
If given, replaces the segments with a number suited to ensure each
dipole represents this distance [m] at maximum.
"""
if looptype == 1:
# case 1: circular
loop = ph.loop.buildCircle(
length/2., P=(xoff, 0, 0), num_segs=segments,
max_length=max_length)
elif looptype == 2:
# case 1: square
loop = ph.loop.buildSquare(
length, P=(xoff, 0, 0), num_segs=segments,
max_length=max_length)
elif looptype == 3:
# case 3: circular eight
loop = ph.loop.buildFig8Circle(
length/2., num_segs=segments, P=(xoff, 0, 0),
max_length=max_length)
elif looptype == 4:
# case 4: square eight
p1 = (-length + xoff, -length/2., 0)
p2 = (length + xoff, length/2., 0)
loop = ph.loop.buildFig8(
(p1, p2), num_segs=segments, max_length=max_length)
else:
# if case is not recognized: None (user has to specify loop)
print('Warning: Mhh, found found no recipie'
' on how to deal with looptype: {} not in (1, 2, 3, 4).'
.format(looptype))
print('Set corresponding loop to None, continue...')
loop = None
if loop is not None:
loop.turns = turns
return loop
if __name__ == '__main__':
__spec__ = None
s = Survey()
kern = s.loadMRSK('D://borkum/borkum_data/cliwat2.mrsk', x_offsets=0)
# kern.calculate()
s.fids[0].save('test_fid.npz')
s2 = FID()
s2.load('test_fid.npz')
# The End