Source code for comet.snmr.survey.survey

"""
Part of comet/snmr/survey

Enhanced sounding class for SNMR data sets and supporting variables.
Sounding class can hold any number of Measurement class instances each
representing single FIDs.
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os
import numpy as np
from comet.snmr import misc
from comet import pyhed as ph

from scipy.io import loadmat
from pathlib import Path
import pygimli as pg


[docs]class Survey(object): """ Survey class for containment and handling of SNMR datasets (FIDS). """ def __init__(self, earth=None, loops=None): # List with stored experiments (soundings/FIDs). self.fids = [] # loops (transmitters and receivers) self.loops = np.array([], dtype=object) self.loop_config = None self.custem_config = None # earth incl, decl, magnitude (larmor frequency) self.earth = None self.setEarth(earth=earth) self.loaded_meshes = {} def __repr__(self): placeholder = '' nNone = sum(self.loops == None) # elementwise comparison to None if nNone > 0: placeholder = '({} are not defined)'.format(nNone) return 'SNMR Survey: {} loops {}, {} soundings'.format( len(self.loops), placeholder, len(self.fids)) @property def pulses(self): ''' Pulse moment vectors gathered from soundings.''' return self._gather('pulses') @property def gates(self): ''' Time gates gathered from soundings.''' return self._gather('gates') @property def data(self): ''' Complex data cube (pulses * gates) from soundings.''' return self._gather('data_gated') @property def error(self): ''' Complex error cube (pulses * gates) from soundings.''' return self._gather('error_gated') @property def data_phases(self): ''' Single data phases of the FIDs.''' return self._gather('data_phase') @property def response(self): ''' Complex data cube (pulses * gates) from soundings.''' return self._gather('response') @property def tx_indices(self): ''' Indices of the used transitter of each sounding.''' return self._gather('tx_index') @property def rx_indices(self): ''' Indices of the used receiver of each sounding.''' return self._gather('rx_index') @property def used_loops(self): _all = np.append(self.tx_indices, self.rx_indices) ret = np.unique(_all) return ret
[docs] def addLoop(self, loop): """ Appends a given loop instance to the loops in survey and returns id """ if isinstance(loop, (tuple, list, np.ndarray)): idx = [] for lp in loop: idx.append(self.addLoop(lp)) else: self.loops = np.append(self.loops, loop) idx = len(self.loops) - 1 if self.loop_config is not None: self.loops[-1].config = self.loop_config else: if self.loops[-1] is not None: self.loops[-1].setFrequency(self.earth.larmor) return idx
[docs] def loadLoopMesh(self, savename, indices=None, dipolename=None): """ Loads mesh and distribute reference to given indices. """ # shortcut for automatic call if savename is None: return # all indices if indices is None: indices = np.arange(0, len(self.loops)) else: indices = np.atleast_1d(indices) # do for all indices for idx in indices: loop = self.loops[idx] if savename not in self.loaded_meshes: try: ph.log.info('load loopmesh "{}"...'.format(savename)) self.loaded_meshes[savename] = pg.Mesh(savename) except RuntimeError as err: ph.log.warning('Did not found loopmesh at "{}"' .format(savename)) raise err ph.log.log(16, f'set loopmesh "{savename}" to loop {loop!r}...') loop.setLoopMesh(self.loaded_meshes[savename], savename=savename) if dipolename is not None: if dipolename not in self.loaded_meshes: try: ph.log.info('load dipole mesh "{}"...' .format(dipolename)) self.loaded_meshes[dipolename] = pg.Mesh(dipolename) except RuntimeError as err: ph.log.warning('Did not found loopmesh at "{}"' .format(savename)) raise err loop.setDipoleMesh(self.loaded_meshes[dipolename], savename=dipolename)
[docs] def addSounding(self, fid): """ Appends a given sounding instance to the sounds in survey and returns id """ self.fids.append(fid) idx = len(self.fids) - 1 return idx
[docs] def createSounding(self, tx=0, rx=0, check_double=True): """ Creates a new sounding based on the given ids for tx and rx. Parameters ---------- tx: integer [ 0 ] Index of the transmitter loop in loops. rx: integer [ 0 ] Index of the receiver loop in loops. Same number than tx indicates a coincident measurement. check_double: boolean [ True ] If True, omits creating another instance of the same fid (tx/rx combination). Instead the index of the original fid is returned. If False new fid is created and its index is returned. Note: tx and rx indices can be setted regardless if there is an actual loop in loops or just a *None* placeholder. In other words you can create your soundings and loops in arbitrary order. """ found = np.logical_and(np.array(self.tx_indices) == tx, np.array(self.rx_indices) == rx) if check_double and np.any(found): idx = np.where(found)[0][0] else: fid = FID(tx=tx, rx=rx) idx = self.addSounding(fid) return idx
[docs] def createKernel(self, fid=0, dimension=1): """ Returns a initialized kernel instance for the chosen sounding. Parameters ---------- sound_index: integer Index of the sounding the kernelclass is calcualting the kernel for. In order to calculate the kernel, pulses, tx and rx are taken as references from the sounding. Note: createKernel does not set or change any values in survey nor in the corresponding sounding. However when calculating, the kernel class will override the frequency in the given loops (tx and rx) and set it to the larmor frequency calculated from the earth magnetic fields magnitude. Use the *setEarth* method before or after you generate the kernel instances, but obviously before calculation. """ from comet.snmr import kernel kern = kernel.Kernel(self, fid=fid, dimension=dimension) return kern
[docs] def setEarth(self, earth=None, incl=60., decl=2., mag=48000*1e-9, rad=False): """ Defines the Earth in terms of inclination, declination and mag. Parameters ---------- earth: comet.snmr.survey.Earth [ None ] Already initialized earth class will be setted. Or created through the other optional arguments. inclination: float [ 60. ] Inclination of the earth magnetic field in rad or degree. declination: float [ 2. ] Declination of the earth magnetic field in rad or degree. magnitude: float [48000 * 1e-9] Magnitude of the earth magnetic field in Tesla. rad: boolean [ False ] Input inclination and declination in rad? """ if earth is not None: # simple setter... self.earth = earth else: # ...or create. self.earth = Earth(incl=incl, decl=decl, mag=mag, rad=rad) if self.loop_config is not None: self.loop_config.f = self.earth.larmor for loop in self.loops: if loop is not None: loop.setFrequency(self.earth.larmor) ph.log.debug('survey.setEarth(): update loop frequencies') ph.log.debug('survey.setEarth():\n{}'.format(self.earth))
[docs] def setLoopConfig(self, config, update_loop_configs=True): """Loop config in terms of primary field resistivity and frequency. """ self.loop_config = config for loop in self.loops: if loop is not None: loop.config = config
[docs] def setCustemConfig(self, config, update_loop_configs=True): self.custem_config = config for loop in self.loops: if loop is not None: loop.secondary_config = self.custem_config
[docs] def setLoops(self, loops): toset = np.atleast_1d(loops) self.loops = toset
[docs] def set1DModel(self, thk=[], res=[1000.]): """Modifies loop config in terms of primary field resistivity. """ for loop in self.loops: loop.setModel(res, d=thk)
[docs] def setResponse(self, array): """ Set a response array from e.g. an inversion as data set for plotting. """ response = np.copy(array) resp_size = int(response.size) mult = 1 data_size = 0 for dat in self.data: data_size += np.prod(dat.shape) if np.isclose(resp_size, data_size): pass elif np.isclose(resp_size, 2 * data_size): mult = 2 else: raise Exception( 'Response shape mismatch. Got {}, expected a vector that can ' 'be reshaped to {}.'.format(np.shape(response), data_size)) last = 0 for fid in self.fids: # response = resp1.real, resp1.imag, resp2.real, ..., respN.imag # mult => 2 # or response = complex(resp1), complex(resp2), ..., complex(respN) # mult => 1 vec = response[last: last + fid.data_gated.size * mult] fid.setResponse(vec) last += fid.data_gated.size * mult
[docs] def loadMRSD(self, filename, remove_df=True, build_loops=False, x_offsets=None, segments=80, max_length=None, tx=None, rx=None, fids=None, debug=False): """ Parameters ---------- filename: string Path to .mrsd file to be imported. build_loops: boolean [ True ] If True, the saved config in the mrsd file is used to construct loops for transmitter and receiver. However, the information in the mrsd fiel is not complete. There are some defaults we assume in autogenerating the loops, especially when it comes to figure-of-eight loops. Feel free to replace the loops with custom created loops of the *pyhed* library. Or switch this off if you only want to see the data or define all the loops yourself. x_offsets: list or None [ None ] One information that is missing in mrsd files, is the relative position of the loops to each other. Here one can fill in this information giving a simple list of offsets in positive x direction (all loops (midpoints) are placed at y=0 and z=0). Expect one float per used loop by the data file or raises an error. Ignored if None and multiple loops are found (in this case no loops are build at all). Coincident measurements do not require this, x is set to 0 by default. segments: integer [ 80 ] Number of dipoles used to auto build the loops. Ignored if *build_loops* is False or not given any *x_offsets*. max_length: float [ None ] Maximum length of a dipole when auto generating the loops. Overrides segments. Ignored if *build_loops* is False or not given any *x_offsets*. """ try: self.loadMRSD_mat( filename, remove_df=remove_df, build_loops=build_loops, x_offsets=x_offsets, segments=segments, max_length=max_length, tx=tx, rx=rx, fids=fids, debug=debug) except NotImplementedError: self.loadMRSD_h5( filename, remove_df=remove_df, build_loops=build_loops, x_offsets=x_offsets, segments=segments, max_length=max_length, tx=tx, rx=rx, fids=fids, debug=debug)
[docs] def loadMRSD_mat(self, filename, remove_df=True, build_loops=False, x_offsets=None, segments=80, max_length=None, tx=None, rx=None, fids=None, debug=False, ): """ See loadMRSD instead. """ ph.log.debug('load MRSD_mat') ph.log.info('load MRSD: {}'.format(os.path.abspath(filename))) # open mrsd (matlab) file and access log of processing steps pl = loadmat(filename, struct_as_record=False, squeeze_me=True)['proclog'] # sig has 4 entries: [noise(not gmr)][FID][T1/T2][T1/T2] # as we are only using FID, num = 1: num = 1 rx0 = np.atleast_1d(pl.Q[0].rx) # 0: first pulse, for general info # number of loops in this class so far numloops = len(self.loops) # number of soudings in data file number_fids = len(rx0) if fids is None: fids = np.arange(number_fids) else: fids = np.atleast_1d(fids) # information about receiver (index, shape, size) rx_info = np.atleast_1d(pl.rxinfo) # auto generate loops from type and size stored in mrsd loop_cfg = np.zeros((len(fids) + 1, 3), dtype=int) loop_cfg[0, :] = [pl.txinfo.channel, pl.txinfo.looptype, pl.txinfo.loopsize] if tx is not None: loop_cfg[0, 0] = tx else: loop_cfg[0, 0] = numloops if rx is not None: rx = np.atleast_1d(rx) maxfid = max(fids) if maxfid >= len(rx_info): raise Exception('Sounding {} not available, found only {} ' 'soundings in total (max index = {})' .format(maxfid, len(rx_info), len(rx_info) - 1)) for i, fidi in enumerate(fids): rxi = rx_info[fidi] if rx is not None: # set rx channel from existing loops rxichannel = rx[i] else: rxichannel = rxi.channel - 1 + numloops # matlab to python loop_cfg[i + 1, :] = [rxichannel, rxi.looptype, rxi.loopsize] # find unique loop indices to append items in self.loops unique, rev_idx = np.unique(loop_cfg[:, 0], return_inverse=True) num_unique = len(unique) if build_loops: if x_offsets is None or len(x_offsets) != num_unique: build_loops = False pr = 0 if x_offsets is None else len(x_offsets) ph.log.warning('Warning: Wrong number of offsets ({}) for {} ' 'loops. Sets loops to None and continue.' .format(pr, num_unique)) # preserve place for loops if new indices are detected max_loop_index = max(unique) new = max_loop_index + 1 - len(self.loops) if new > 0: for index in range(new): self.addLoop(None) # new indices in site container if tx is None and rx is None: # shift due to existing loops, start at first new loop site_idx = np.arange(numloops, numloops + num_unique) else: # given indices used for existing loops, start at 0 site_idx = np.arange(0, len(self.loops)) if build_loops: # auto generate loop discretizations for rev in range(num_unique): self.loops[site_idx[rev]] =\ createLoopFromMRS( loop_cfg[rev, 1], loop_cfg[rev, 2], x_offsets[rev], segments=segments, max_length=max_length) for nrxi, nrx in enumerate(fids): # create empty FID class for the combination of tx and rx # translate local indices from mrsd into global indices in class ph.log.log(13, 'load sounding {}'.format(nrx)) fid = FID(tx=loop_cfg[0, 0], rx=loop_cfg[nrxi + 1, 0]) # loop turns fid.tx_turns = pl.txinfo.loopturns fid.rx_turns = rx_info[nrx].loopturns # set pulse moments fid.setPulses(np.array([q.q for q in pl.Q])) # times (not gated) raw_times = rx0[nrx].sig[num].t # ungated data, init array raw_data = np.zeros( (len(fid.pulses), len(raw_times)), dtype=type(np.atleast_1d(pl.Q[0].rx)[nrx].sig[num].V[0])) # ungated error, init array raw_error = np.zeros( (len(fid.pulses), len(raw_times)), dtype=type(np.atleast_1d(pl.Q[0].rx)[nrx].sig[num].E[0])) # frequency offset, init array fid.df = np.zeros_like(fid.pulses) # phases, init array phi = np.zeros_like(fid.pulses) # fill data, phases, frequency offsets and errors # one df and phi per pulse moment ! for pulse in range(raw_data.shape[0]): rx = np.atleast_1d(pl.Q[pulse].rx)[nrx] raw_data[pulse] = rx.sig[num].V raw_error[pulse] = rx.sig[num].E fid.df[pulse] = rx.sig[num].fit[2] phi[pulse] = rx.sig[num].fit[3] fid.setPhases(phi) # find certain times that shoudn't be imported. # function copied from MRSmatlab, tested, same results minmaxt = pl.event[(pl.event[:, 0] == 101.) & # index (pl.event[:, 1] == 1) & # (pl.event[:, 2] == 0) & # (pl.event[:, 3] == 1) & # (pl.event[:, 4] == 2), 5:7] # times: min, max try: mint_ind = np.argmin(np.abs(raw_times - minmaxt[0][0])) except IndexError: mint_ind = 0 try: maxt_ind = np.argmin(np.abs(raw_times - minmaxt[0][1])) except IndexError: maxt_ind = len(raw_times) - 1 # shortens the time, data and error vector/data_gated raw_times = np.array(raw_times[mint_ind:maxt_ind + 1]) with ph.misc.temporal_printoptions(): ph.log.debug('raw times: {} ({})'.format(raw_times, len(raw_times))) raw_data = raw_data[:, mint_ind:maxt_ind + 1] with ph.misc.temporal_printoptions(): ph.log.debug('raw data: {}'.format(np.shape(raw_data))) raw_error = raw_error[:, mint_ind:maxt_ind + 1] # all pulses have the same pulse duration fid.setPulseDuration(pl.Q[nrx].timing.tau_p1, deadtime_device=pl.Q[0].timing.tau_dead1) # sets ungated and unrotated data in the FID # together with the corresponding original times fid.setRawDataErrorAndTimes(raw_data, raw_error, raw_times) # apply some default gating # can be set manually again as many times as the user like fid.gating() self.addSounding(fid)
[docs] def loadMRSD_h5(self, filename, remove_df=True, build_loops=False, x_offsets=None, segments=80, max_length=None, tx=None, rx=None, fids=None, debug=False): """ See loadMRSD instead. """ import h5py ph.log.info('load MRSD: {}'.format(os.path.abspath(filename))) # open mrsd (matlab) file and access log of processing steps mat = h5py.File(filename, mode='r') pl = mat['proclog'] # sig has 4 entries: [noise(not gmr)][FID][T1/T2][T1/T2] # as we are only using FID, num = 1: num = 1 # 0: first pulse, for general info rx0 = np.atleast_1d(pl['Q']['rx'][0]) # number of loops in this class so far numloops = len(self.loops) # number of soudings in data file number_fids = len(rx0) if fids is None: fids = np.arange(number_fids) else: fids = np.atleast_1d(fids) # information about receiver (index, shape, size) rx_info = pl['rxinfo'] tx_info = pl['txinfo'] # auto generate loops from type and size stored in mrsd loop_cfg = np.zeros((len(fids) + 1, 3), dtype=int) loop_cfg[0, :] = [tx_info['channel'][0, 0], tx_info['looptype'][0, 0], tx_info['loopsize'][0, 0]] if tx is not None: loop_cfg[0, 0] = tx if rx is not None: rx = np.atleast_1d(rx) for i, rxi in enumerate(fids): # range(len(rx_info['channel'])): if rx is not None: # set rx channel from existing loops rxichannel = rx[i] else: rxichannel = rx_info['channel'][rxi, 0] loop_cfg[i + 1, :] = [rxichannel, rx_info['looptype'][rxi, 0], rx_info['loopsize'][rxi, 0]] # find unique loop indices to append items in self.loops unique, rev_idx = np.unique(loop_cfg[:, 0], return_inverse=True) num_unique = len(unique) print(loop_cfg, unique) if build_loops: if x_offsets is None or len(x_offsets) != num_unique: build_loops = False pr = 0 if x_offsets is None else len(x_offsets) ph.log.warning('Warning: Wrong number of offsets ({}) for {} ' 'loops. Sets loops to None and continue.' .format(pr, num_unique)) # preserve place for loops if new indices are detected max_loop_index = max(unique) new = max_loop_index + 1 - len(self.loops) if new > 0: for index in range(new): self.addLoop(None) # new indices in site container if tx is None and rx is None: # shift due to existing loops, start at first new loop site_idx = np.arange(numloops, numloops + num_unique) else: # given indices used for existing loops, start at 0 site_idx = np.arange(0, len(self.loops)) if build_loops: # auto generate loop discretizations for rev in range(num_unique): self.loops[site_idx[rev]] =\ createLoopFromMRS( loop_cfg[rev, 1], loop_cfg[rev, 2], x_offsets[rev], segments=segments, max_length=max_length) rxs = ph.misc.getAllValuesByReference(mat, pl['Q']['rx']) for nrxi, nrx in enumerate(fids): # create empty FID class for the combination of tx and rx # translate local indices from mrsd into global indices in class ph.log.log(13, 'load sounding {}'.format(nrx)) fid = FID(tx=loop_cfg[0, 0], rx=loop_cfg[nrxi + 1, 0]) # loop turns fid.tx_turns = int(tx_info['loopturns'][0, 0]) fid.rx_turns = int(rx_info['loopturns'][nrx, 0]) # set pulse moments pulses = ph.misc.getAllValuesByReference(mat, pl['Q']['q']) fid.setPulses(pulses) # times (not gated) raw_times = ph.misc.getAllValuesByReference( mat, rxs[0]['sig']['t'][num][nrx])[:, 0] volts = ph.misc.getAllValuesByReference( mat, rxs[0]['sig']['V'][num][nrx])[:, 0] errors = ph.misc.getAllValuesByReference( mat, rxs[0]['sig']['E'][num][nrx])[:, 0] # and people say hdf5 is so nice... # ungated data, init array if volts.dtype == np.dtype([('real', '<f8'), ('imag', '<f8')]): dtype0 = np.complex128 else: dtype0 = np.float64 if errors.dtype == np.dtype([('real', '<f8'), ('imag', '<f8')]): dtype1 = np.complex128 else: dtype1 = np.float64 raw_data = np.zeros( (len(fid.pulses), len(raw_times)), dtype=dtype0) # ungated error, init array raw_error = np.zeros( (len(fid.pulses), len(raw_times)), dtype=dtype1) # frequency offset, init array fid.df = np.zeros_like(fid.pulses) # phases, init array phi = np.zeros_like(fid.pulses) # fill data, phases, frequency offsets and errors # one df and phi per pulse moment ! for pulse in range(raw_data.shape[0]): # volts = ph.misc.getAllValuesByReference( # mat, rxs[pulse]['sig']['V'][num][0])[:, 0] volts = ph.misc.getAllValuesByReference( mat, rxs[pulse]['sig']['V'][num][nrx])[:, 0] if dtype0 == np.complex128: raw_data[pulse] = volts['real'] + 1j * volts['imag'] else: raw_data[pulse] = volts errors = ph.misc.getAllValuesByReference( mat, rxs[pulse]['sig']['E'][num][nrx])[:, 0] if dtype0 == np.complex128: raw_error[pulse] = errors['real'] + 1j * errors['imag'] else: raw_error[pulse] = errors fit = ph.misc.getAllValuesByReference( mat, rxs[pulse]['sig']['fit'][num][nrx])[:, 0] fid.df[pulse] = fit[2] phi[pulse] = fit[3] fid.setPhases(phi) # find certain times that shoudn't be imported. # function copied from MRSmatlab, tested, same results ev = np.array(pl['event']) minmaxt = ev[5:7, (ev[0] == 101.) & # index (ev[1] == 1) & # (ev[2] == 0) & # (ev[3] == 1) & # (ev[4] == 2)] # times: min, max try: mint_ind = np.argmin(np.abs(raw_times - minmaxt[0][0])) except IndexError: mint_ind = 0 try: maxt_ind = np.argmin(np.abs(raw_times - minmaxt[0][1])) except IndexError: maxt_ind = len(raw_times) - 1 # shortens the time, data and error vector/data_gated raw_times = np.array(raw_times[mint_ind:maxt_ind + 1]) raw_data = raw_data[:, mint_ind:maxt_ind + 1] raw_error = raw_error[:, mint_ind:maxt_ind + 1] # all pulses have the same pulse duration timing = ph.misc.getAllValuesByReference( mat, pl['Q']['timing'])[nrx] fid.setPulseDuration(timing['tau_p1'][0, 0], deadtime_device=timing['tau_dead1'][0, 0]) # sets ungated and unrotated data in the FID # together with the corresponding original times fid.setRawDataErrorAndTimes(raw_data, raw_error, raw_times, debug=debug) # apply some default gating # can be set manually again as many times as the user like fid.gating() self.addSounding(fid)
[docs] def loadMRSK(self, filename, tx=None, rx=None, fid=None, set_earth=True, distribute_loop_config=False, x_offsets=None, segments=80, max_length=None, deadtime_device=0.005, min_thk=0, verbose=True, set_df=False): try: kern = self._loadMRSK_mat( filename, tx=tx, rx=rx, fid=fid, set_earth=set_earth, distribute_loop_config=distribute_loop_config, x_offsets=x_offsets, segments=segments, max_length=max_length, deadtime_device=deadtime_device, min_thk=min_thk, set_df=set_df, verbose=verbose) except NotImplementedError: # new matlab files are h5 files kern = self._loadMRSK_h5( filename, tx=tx, rx=rx, fid=fid, set_earth=set_earth, distribute_loop_config=distribute_loop_config, x_offsets=x_offsets, segments=segments, max_length=max_length, deadtime_device=deadtime_device, min_thk=min_thk, set_df=set_df, verbose=verbose) return kern
def _loadMRSK_mat( self, filename, tx=None, rx=None, fid=None, set_earth=True, distribute_loop_config=False, x_offsets=None, segments=80, max_length=None, deadtime_device=0.005, min_thk=0, set_df=False, verbose=True): """ Load .mrsk file and returns kernel class instance with the same parameters. """ # load matlab struct ph.log.info('load MRSK: {}'.format(os.path.abspath(filename))) MAT = loadmat(filename, struct_as_record=False, squeeze_me=True) kdata = MAT['kdata'] measure = kdata.measure loop = kdata.loop if kdata.measure.pulsesequence not in (1, 'FID'): raise Exception('Cannot load other pulsesequences than FIDs.') if set_earth: self.setEarth(incl=float(kdata.earth.inkl), decl=float(kdata.earth.decl), mag=float(np.abs(kdata.earth.erdt))) # add sounding and get id if fid is None: if rx is None: if tx is None: tx = 0 rx = tx fid = FID(tx=tx, rx=rx) sound_id = self.addSounding(fid) else: sound_id = int(fid) fid = self.fids[fid] rx = fid.rx_index tx = fid.tx_index # add loop and get id if tx is None: if x_offsets is None: tx = self.addLoop(None) print('Warning: Please provide *loadMRSK* with tx and rx ' 'indices or an offsets for automatic loop creation.' ' Appended placeholder loop at index {} and continue.' .format(tx)) else: tx = self.addLoop(createLoopFromMRS( kdata.loop.shape, kdata.loop.size, np.atleast_1d(x_offsets)[0], segments=segments, max_length=max_length)) fid.tx_turns = int(loop.turns[0]) fid.rx_turns = int(loop.turns[1]) fid.setPulses(measure.pm_vec) fid.setPulseDuration( measure.taup1, deadtime_device=deadtime_device) if set_df: ph.log.debug('set frequency offset') fid.setFrequencyOffset(measure.df) else: ph.log.debug('did NOT set frequency offset') fid.temperature = kdata.earth.temp if kdata.earth.res == 1: # case 1/3: resistive rho = np.array([10000.]) thk = [] else: rho = 1./np.atleast_1d(kdata.earth.sm) if len(np.atleast_1d(kdata.earth.sm)) <= 2: # case 2/3: halfspace thk = np.atleast_1d(kdata.earth.zm) else: # case 3/3: layered halfspace: depths -> thicknesses thk = np.append(kdata.earth.zm[0], np.diff(kdata.earth.zm)) try: if tx is not None and self.loops[tx] is not None: self.loops[tx].setModel(rho, d=thk) except IndexError: pass try: if rx is not None and self.loops[rx] is not None: self.loops[rx].setModel(rho, d=thk) except IndexError: pass kern = self.createKernel(fid=sound_id, dimension=int(kdata.measure.dim)) kern.K = np.conjugate(kdata.K) zvec = np.append(np.zeros(1), -np.cumsum(kdata.model.Dz)) kern.setZVector(zvec, min_thk=min_thk) return kern def _loadMRSK_h5( self, filename, tx=None, rx=None, fid=None, set_earth=True, distribute_loop_config=False, x_offsets=None, segments=80, max_length=None, deadtime_device=0.005, min_thk=0, set_df=False, verbose=True): """ Load .mrsk file and returns kernel class instance with the same parameters. """ import h5py # load matlab struct ph.log.info('load file: {}'.format(os.path.abspath(filename))) MAT = h5py.File(filename, mode='r') kdata = MAT['kdata'] # Dimension D = int(kdata['measure']['dim'][()].squeeze()) # Earth earth = kdata['earth'] measure = kdata['measure'] loop = kdata['loop'] if measure['pulsesequence'][()].flat[0] != 1: raise Exception('Cannot load other pulsesequences than FIDs.') if set_earth: self.setEarth(incl=float(earth['inkl'][()].flat[0]), decl=float(earth['decl'][()].flat[0]), mag=float(np.abs(earth['erdt'][()].flat[0])), rad=False) # add sounding and get id rx = rx or tx # None -> tx if fid is None: fid = FID(tx=tx, rx=rx) sound_id = self.addSounding(fid) else: sound_id = int(fid) fid = self.fids[fid] rx = fid.rx_index tx = fid.tx_index # add loop and get id if tx is None: if x_offsets is None: tx = self.addLoop(None) print('Warning: Please provide *loadMRSK* with tx and rx ' 'indices or an offsets for automatic loop creation.' ' Appended placeholder loop at index {} and continue.' .format(tx)) else: tx = self.addLoop(createLoopFromMRS( loop['shape'][()].flat[0], loop['size'][()].flat[0], np.atleast_1d(x_offsets)[0], segments=segments, max_length=max_length)) fid.tx_turns = int(loop['turns'][()].flat[0]) fid.rx_turns = int(loop['turns'][()].flat[1]) fid.setPulses(measure['pm_vec'][()].squeeze()) fid.setPulseDuration( measure['taup1'][()].flat[0], deadtime_device=deadtime_device) if set_df: ph.log.debug('set frequency offset') fid.setFrequencyOffset(measure['df'][()].flat[0]) else: ph.log.debug('DID NOT set frequency offset') fid.temperature = earth['temp'][()].flat[0] if earth['res'][()].flat[0] == 1: # case 1/3: resistive rho = np.array([10000.]) thk = [] else: sm = np.atleast_1d(earth['sm'][()].squeeze()) zm = np.atleast_1d(earth['zm'][()].squeeze()) rho = 1./np.atleast_1d(sm) if len(np.atleast_1d(sm)) < 2: # case 2/3: halfspace thk = np.atleast_1d(zm) else: # case 3/3: layered halfspace: depths -> thicknesses thk = np.append(zm[0], np.diff(zm)) if self.loops[tx] is not None: self.loops[tx].setModel(rho, d=thk) if self.loops[rx] is not None: self.loops[rx].setModel(rho, d=thk) kern = self.createKernel(fid=sound_id, dimension=int(D)) kraw = kdata['K'][()].squeeze() # the minus sign is a convention thing kern.K = kraw['real'][()].squeeze().T + \ 1j * kraw['imag'][()].squeeze().T zvec = np.append(np.zeros(1), -np.cumsum(kdata['model']['Dz'][()])) kern.setZVector(zvec, min_thk=min_thk) return kern
[docs] def save(self, savename, save_loops=True, use_original_loop_names=False): savename = Path(savename).with_suffix('') ph.log.info('Survey.save, {} loops, {} fids, name: {}' .format(len(self.loops), len(self.fids), savename.with_suffix('.npz').as_posix())) # loops loop_names = [] if save_loops: orig_names =\ [None if loop is None else loop.name for loop in self.loops] use_orig = use_original_loop_names if None in self.loops or\ None in orig_names or\ len(np.unique(orig_names)) != len(self.loops): if use_original_loop_names: ph.log.warning('survey.save: Loop names not unique, ' 'create new names.') use_orig = False if use_orig: for lp, loop in enumerate(self.loops): path = savename.with_name( Path(orig_names[lp]).name).as_posix() loop_names.append(path) loop.save(path) else: for lp, loop in enumerate(self.loops): if loop is not None: temp_name = '{}_loop_{}'.format(savename.stem, lp) loop_names.append( savename.with_name(temp_name).as_posix()) loop.save(savename.with_name(temp_name).as_posix()) else: loop_names.append(None) # fids fid_names = [] for fd, fid in enumerate(self.fids): temp_name2 = '{}_fid_{}.npz'.format(savename.stem, fd) fid_names.append(temp_name2) fid.save(savename.with_name(temp_name2).as_posix()) # earth + remember names ph.log.debug(f'Survey.save: loop_names = {loop_names}') np.savez(savename.with_suffix('.npz').as_posix(), mag=self.earth.magnitude, incl=self.earth.inclination, decl=self.earth.declination, loop_names=loop_names, fid_names=fid_names)
[docs] def load(self, savename, load_meshes=True, load_loops=True): from comet.pyhed.IO import getItem savename = Path(savename) dir_abs = savename.parent.absolute() ph.log.info('Survey.load(): "{}"'.format(savename.as_posix())) with np.load(savename.with_suffix('.npz').as_posix(), allow_pickle=True) as npz: self.setEarth(mag=getItem(npz, 'mag'), decl=getItem(npz, 'decl'), incl=getItem(npz, 'incl'), rad=True) ph.log.log(16, self.earth) off_loops = len(self.loops) loop_names = np.atleast_1d(getItem(npz, 'loop_names')) ph.log.info('Survey.load(): found {} loops' .format(len(loop_names))) ph.log.debug(f'Survey.load(): loop names {loop_names}') for name in loop_names: if name is None or load_loops is False: # case 1/4: no name found # case 4/4: no loading of loops per flag temp = None elif isinstance(name, str): name = Path(name) pabs = name.parent.absolute() if pabs != dir_abs: # case 2/4: only file name or partial relative path # need to get path from survey temp = ph.loop.loadLoop( Path(dir_abs).joinpath(name.stem).as_posix(), load_meshes=False, overwrite_dir=True) else: # case 3/4: absolute path # with subfolders provided temp = ph.loop.loadLoop(name.as_posix(), load_meshes=False) # in any case add loop or None to list of self.loops self.addLoop(temp) # loading meshes through this enables caching if load_meshes and temp is not None: self.loadLoopMesh(temp.loop_mesh_name, -1, dipolename=temp.dipole_mesh_name) if len(self.loops) > 0 and self.loops[0] is not None: self.setCustemConfig(self.loops[0].secondary_config, update_loop_configs=False) fid_names = np.atleast_1d(getItem(npz, 'fid_names')) ph.log.info('Survey.load(): found {} fids'.format(len(fid_names))) for name2 in fid_names: temp2 = FID() temp2.load( savename.parent.joinpath(name2).as_posix()) # shift if loaded npz is added to non empty survey temp2.setTx(temp2.tx_index + off_loops) temp2.setRx(temp2.rx_index + off_loops) self.addSounding(temp2)
[docs] def createMRS(self, fid=0, kernel=None, mtype='smooth', dtype='complex', nlay=3, lam=1000, dimension=2, **kwargs): from comet.snmr.modelling import MRS mrs = MRS(survey=self, fid=fid, mtype=mtype, dtype=dtype, kernel=kernel, # is created if None dimension=dimension, **kwargs) return mrs
def _gather(self, attribute): """ Internal function. Gather variables from underlaying soundings for convenience. """ ret = [] for fid in self.fids: ret.append(getattr(fid, attribute)) return ret
[docs]class FID(object): """ Single SNMR experiment (sounding) using a simple Free Induction Decay (FID). Attributes to be setted directly: """ def __init__(self, tx=0, rx=0, pulses=None): # pulse moment vector [As] # default pulses from MRSmatlab (convenient for synthetic stuff) default_pulses =\ np.array((278, 320, 436, 540, 675, 830, 1049, 1250, 1460, 1760, 2106, 2458, 2925, 3391, 4004, 4677, 5473, 6273, 7155, 8148, 9281, 10515, 11926, 13556), np.float64) / 1000. self.setPulses(pulses or default_pulses) # times [s], used for gating (not needed if gates are set manually) self._times = None # time gates midpoints [s] self._gates = None self._gates_thk = None self._gating_last = 50 self._filter_last = [0., 2.] # temperature [K] self._temperature = None # init attribute for overview self.temperature = 281 # fills _temperature # also fills self.curie # ca. 8 °C, middle temperature in germany ( ~ 10 meter depth) # transmitter self.tx_turns = None self.tx_index = None self.setTx(tx, turns=1) # receiver self.rx_turns = None self.rx_index = None self.setRx(rx, turns=1) # general frequency offset [Hz] self.df = None self.setFrequencyOffset(0.0) # phase per pulse moment # Phase needed for rotated amplitude inversions, igored for complex self.phi = None # shape = pulse moments # phase of device (once), [-pi, pi] # Phase needed for complex inversions, igored for rotated amplitudes # (set to zero) self.data_phase = 0.0 # data self.data_raw = None # shape = pulse moments * times self.data_gated = None # shape = pulse moments * gates self.raw_rotated = False # rotated data vector flag, raw data self.rotated = False # rotated data vector flag, gates only # error self.error_raw = None # shape = pulse moments * times self.error_gated = None # shape = pulse moments * gates self.error_weights = None # response self.response = None # duration pulse 1 [s] self.taup1 = None # init atttribute for overview self.deadtime_device = None # -""- self.setPulseDuration(0.04) # default 40 ms + 5 ms device def __repr__(self): return 'FID(Tx: {}, Rx: {})'.format(self.tx_index, self.rx_index) @property def deadtime(self): """ Effective deadtime (device + half pulse) [s].""" return self.taup1 / 2. + self.deadtime_device @property def pulses(self): """ Pulse moment vector [As]. """ return self._pulses @property def amperes(self): """ Ampere vector [A]. """ return self._pulses / self.taup1 # internally pulses=As -> A = As/s @amperes.setter def amperes(self, vec): self.setPulses(vec * self.taup1) # A in -> set As = A * s @property def times(self): """ Time vector [s] of raw data (including deadtime). """ if self._times is None: return None else: return self._times + self.deadtime @property def gates(self): """ Time gate midpoint vector [s] (including deadtime). """ if self._gates is None: return None else: return self._gates + self.deadtime @property def temperature(self): """ Middle temperature [K]. Default = 281 K (8°C or 46.4°F). """ return self._temperature @temperature.setter def temperature(self, temperature): """ Expect temperature in Kelvin. """ self._temperature = temperature self._curie = misc.constants.calcCurieFactor(self._temperature) @property def curie(self): """ Curie factor for kernel calculation. Read only. Calculated automatically by setting temperature. """ return self._curie
[docs] def setTx(self, index, turns=None): """ Define index of transmitter loop and turns. """ self.tx_index = index if turns is None: if self.tx_turns is None: self.tx_turns = 1 else: self.tx_turns = turns
[docs] def setRx(self, index, turns=None): """ Define index of receiver loop and turns. """ self.rx_index = index if turns is None: if self.tx_turns is None: self.tx_turns = 1 else: self.rx_turns = turns
[docs] def getComplexData(self): # stich together real and imag part of data # = also raveled, but twice the length! if self.rotated: ph.log.warning('getComplexData: rotatedAmplitudes used!') ret = self.data_gated else: if not np.isclose(self.data_phase, 0): # changes for rot of complex data by phase: 24.06.2020 ph.log.log(13, 'getComplexData: {:.2f}'.format( self.data_phase)) rot_data = misc.rotData(self.data_gated, self.data_phase) ret = rot_data else: ph.log.log(13, 'getComplexData: no phase') ret = self.data_gated return ret
[docs] def getRotatedAmplitudes(self): """ Returns Data and Error as real component of the rotated Vecs.""" if not self.rotated: self.rotateAmplitudes() data = self.data_gated.real.ravel() # no abs here ! error = self.error_gated.real.ravel() return (data, error)
[docs] def setRawDataErrorAndTimes(self, data, error, times, rotated=False, phases=None, remove_df=True, omit_regating=False): """ Sets the raw (processed but ungated) data vector along with the time discretization and errorvector. Parameters ---------- data: np.ndarray Data vector of shape (number of pulses, times). Expect complex valued vector. error: np.ndarray Error vector of the same shape as the data vector. times: np.ndarray Simple time vector in seconds with shape matching the dimension 1 of the data and error vector, expect times without deadtime! rotated: boolean [ False ] Define whether the data are already rotated or not. There is no autodetect for that. phases: np.ndarray [ None ] Define phases as simple vector containing phases in rad. Expect one value per pulse. remove_df: boolean [ True ] Removes the frequency offset in the given data stored in the attribute **df** [Hz]. omit_regating: boolean [ False ] When setting the raw data, the gated data need to be recalculated. By default this is done via regating with the original settings for the gating. Sets ---- This functionality fills the following attributes: *data_raw*, *times*, *error_raw*, *raw_rotated* and optionally: *phi* (phases) """ if times[0] >= self.deadtime: with ph.misc.temporal_printoptions(threshold=5): ph.log.warning( 'First entry for given times ({}) is greater than ' 'deadtime of {:.4f} sec. Expect given vector to not ' 'include deadtime.' .format(np.array(times), self.deadtime)) if np.shape(data) != np.shape(error): raise Exception( 'Shape mismatch between data and error: {} != {}' .format(np.shape(data), np.shape(error))) if len(times) != np.shape(data)[1]: raise Exception( 'Shape mismatch between data/error and times: {} != {}' .format(len(times), np.shape(data)[1])) self._times = times self.data_raw = data # fix frequency offset if remove_df: # check and remove this ph.log.debug('todo: validate fix with df') self.data_raw *= np.exp(-1j * 2. * np.pi * self.df[:, np.newaxis] * self.times) self.error_raw = error self.raw_rotated = rotated if phases is not None: self.setPhases(phases) if not omit_regating: self._regating() # new raw data -> update gated data
[docs] def setGatedDataErrorAndGates(self, data, error, gates, rotated=False, phases=None, midpoints=True): """ Sets the processed and gated data vector along with the gates (time discretization) and error cube. Parameters ---------- data: np.ndarray Data vector of shape (number of pulses, number of gates). Expect complex valued vector. error: np.ndarray Error vector of the same shape as the data vector. gates: np.ndarray Simple time vector in seconds with shape matching the dimension 1 of the data and error vector. Expect gates without deadtime. rotated: boolean [ False ] Define whether the data are already rotated or not. thee is no autodetect for that. phases: np.ndarray [ None ] Define phases as simple vector containing phases in rad. Expect one value per pulse. midpoints: boolean [ True ] If True (default) the given times in the gates vector are interpreted as midpoint of gates. However if False the vector is interpreted as outer limits of the gates, so gate 1 would be defined between time 1 and time 2 and gate 2 between time 2 and 3 and so on. Sets ---- This functionality fills the following attributes: *data_gated*, *gates*, *error_gated*, *rotated* and optionally: *phi* (phases) """ if np.shape(data) != np.shape(error): raise Exception( 'Shape mismatch between data and error: {} != {}' .format(np.shape(data), np.shape(error))) if len(gates) != np.shape(data)[1]: raise Exception( 'Shape mismatch between data/error and gates: {} != {}' .format(len(gates), np.shape(data)[1])) self.setGates(gates, midpoints=midpoints) self.data_gated = data self.error_gated = error self.rotated = rotated if phases is not None: self.setPhases(phases)
[docs] def setPhases(self, phi): """ Sets variable phi. No check for length if vector is done. See setGatedDataErrorAndGates or setRawDataErrorAndTimes for more details. """ self.phi = phi
[docs] def setDataPhase(self, data_phase): """ Sets variable data_phase. Expect single float value for data phase in rad. """ ph.log.log(13, 'setDataPhase: {:.2f}'.format(data_phase)) self.data_phase = data_phase
[docs] def setPulseDuration(self, taup, deadtime_device=0.005): """ Sets pulse duration [s] and internal deadtime from the device. Parameters ---------- taup: float Pulse duration in seconds. deadtime_device: float [ 0.005 ] Internal deadtime of the measurement device in seconds. 0.005 seconds are default for synthetic studies. Sets ---- *taup1*, *deadtime_device*, *deadtime* (half pulse + deadtime_device) """ self.taup1 = taup self.deadtime_device = deadtime_device
[docs] def setFrequencyOffset(self, df): """ Sets frequency offset of tx pulse to larmor frequency. Expect one value per pulse or one single value (used for all pulses). None is treated as zero offset (internal initialization). """ if df is None: self.df = np.zeros(len(self.pulses)) if not isinstance(df, np.ndarray): self.df = np.ones(len(self.pulses)) * df else: self.df = df
[docs] def setPulses(self, pulses): """ Set pulse moment vector. Expect array with float in [As]. Sets ---- *pulses* """ self._pulses = np.atleast_1d(np.array(pulses, dtype=np.float))
[docs] def setGates(self, gates, midpoints=True): """ Define time gates. Parameters ---------- gates: np.ndarray Define gates midpoints. Expect array with float in [s]. See midpoints for definition of how the input array is interpreted. midpoints: boolean [ True ] If True (default) the given times in the gates vector are interpreted as midpoint of gates. However if False the vector is interpreted as outer limits of the gates, so gate 1 would be defined between time 1 and time 2 and gate 2 between timne 2 and 3 and so on. Sets ---- *gates* and *_gates_thk* if not the midpoints are given """ with ph.misc.temporal_printoptions(): ph.log.debug('setGates: {} ({})'.format(gates, len(gates))) if gates[0] >= self.deadtime: with ph.misc.temporal_printoptions(threshold=5): ph.log.warning( 'First entry for given gates ({}) is greater than ' 'deadtime of ({} sec). Expect given vector to not include' ' deadtime.'.format(np.array(gates), self.deadtime)) if midpoints is True: self._gates = gates else: if not np.isclose(gates[0], 0.0): ph.log.warning('Had to add 0 as first entry of given time ' 'vector in order to calculate the midpoints ' 'of the gates. If you want to set given times ' 'directly as midpoints use midpoints=True ' 'in the setGates call.') gates = np.append([0.], gates) self._gates_thk = np.diff(gates) self._gates = gates[1:] - (self._gates_thk/2)
[docs] def setResponse(self, array): """ Sets a respinse array with the same shape as the data e.g. from an inversion instance. For plotting only. """ if np.isclose(array.size, 2 * self.data_gated.size): real, imag = np.split(np.array(array).flatten(), 2) array = real + 1j * imag try: self.response = array.reshape(self.data_gated.shape) except ValueError as Ex: ph.log.error('setResponse(): Wrong shape of input vector.') raise Ex
[docs] def gating(self, num_gates=42, verbose=False): """ Explanation: ------------ (extracted from MRSMatlab, 2017) y=exp(x) For some interval x(a:b) the exact mean within exp(x(a:b)) yAverage = exp(mean(log(y(a:b)))) t(yAverage) = mean(t(a:b)) Problem: Logarithm is nice for exact average of exponential function. But signals are noise contaminated. 1. Logarithm of gaussian noise changes noise structure from gaussian to lorenzian. Averaging of lorenzian distributed noise is not zero. 2. Since noise can make signal negative a dc shift is added to make signals positive. This deminishes the accurancy of averaging in logspace. For large constant shift averaging in logspace becomes equivalent to average in linspace. However this is nice for noise structure. So we have a tradeoff. Finally, from some amount of intervals on, e.g. 20 within interval [0 1]/s averaging is sufficiently exact in any case. MMP 18/10/2011 """ if self.data_raw is None: raise Exception('No raw data for gating available.') # effective deadtime dt = self.deadtime assert not np.isclose(dt, 0), 'Gating: Deadtime cannot be zero.' # raw data signalBaseCorr = np.abs(10.0 * np.min(self.data_raw)) signal = self.data_raw + signalBaseCorr error = self.error_raw # _time = time vector - deadtime time = np.round(self._times, decimals=10) if np.isclose(time[0], 0): logtime = np.abs(np.logspace(np.log10(time[1] + dt), np.log10(time[-1] + time[1] + dt), num_gates) - time[1]) - dt else: logtime = np.abs(np.logspace(np.log10(time[1]), np.log10(time[-1] + time[1]), num_gates) - time[1]) indextime = np.ones_like(logtime, dtype=int) for i in range(len(logtime) - 1): indextime[i] = np.where(logtime[i] < time)[0][0] # ensures that timegates contain 1 data point and size is increasing indextime = np.cumsum(np.sort(np.diff(np.unique(indextime[:-1])))) indextime = np.hstack((0, indextime, len(time))) if ph.log.getEffectiveLevel() <= 20: with ph.misc.temporal_printoptions(): ph.log.info('gating: indices {}, for {} gates' .format(indextime, len(indextime) - 1)) gateV = np.zeros((len(signal), len(indextime) - 1), dtype=type(signal[0, 0])) gateT = [] # mean time of gate i gateL = [] # number of data points in gate i for i in range(0, len(indextime) - 1): gateV[:, i] = np.exp(np.mean(np.log( signal[:, indextime[i]:indextime[i + 1]]), axis=1)) hlp = time[indextime[i]:indextime[i + 1]] gateT.append(np.mean(hlp)) gateL.append(len(hlp)) gateV -= signalBaseCorr gateT = np.array(gateT, dtype=np.float64) with ph.misc.temporal_printoptions(): ph.log.info('gating: midpoints: {} ({})' .format(gateT, len(gateT))) # set time gate midpoints self.setGates(gateT) # set data cube self.error_weights = np.sqrt(np.array(gateL, dtype=float)) self.error_gated = np.tile(np.mean(error, axis=1), (len(indextime) - 1, 1)).T self.error_gated /= self.error_weights # set data cube self.data_gated = gateV self.filterGates(mint=self._filter_last[0], maxt=self._filter_last[1]) # set some flags at last # are these data rotated ? # move flag from raw data to gated data. self.rotated = self.raw_rotated # save input for automatical regating self._gating_last = num_gates
def _regating(self): """ Reapply gating and filter to get unrotated data again. """ self.gating(self._gating_last)
[docs] def setRotated(self, rotated, raw_data=False): """ Sets rotation of data. True = rotatedAmplitudes, False = complex. """ if self.rotated is rotated: pass else: if rotated: self.rotateAmplitudes(raw_data=raw_data) else: if self.raw_rotated: raise Exception( 'Cannot unrotate, raw data already rotated') else: self._regating()
[docs] def rotateAmplitudes(self, raw_data=False): """ One of the three main ways for NMR forward modelling is to use rotated amplitudes, instead of using the amplitudes of the complex data or the complex data itself. If the phase information of the noise free data is known (synthetic data) or fitted (e.g. monoexponential fit) the rotated Amplitudes (also complex, do not confuse) have the advantage of containing all the information in the real part (together with noise), where the imaginary part contians only noise and can therefore be discarded later. Can be used on gated or ungated data, however this call alters the raw_data! Parameters ---------- raw_data: boolean [ True ] Flag to decide if raw data or gated data are rotated. Default is raw data, however if no raw data are Returns ------- complex rotated raveled data. """ if self.raw_rotated or (self.rotated and not raw_data): raise Exception('Already rotated.') if raw_data: data = self.data_raw # times = self.times else: data = self.data_gated # times = self.gates if data is None: raise Exception('No data for calculation of rotated amplitudes.') absolute = np.absolute(data) phase = np.angle(data) if np.shape(data) != np.shape(self.phi): PHI = self.phi[:, np.newaxis] else: PHI = self.phi if not isinstance(self.df, np.ndarray): self.df = np.ones(len(self.phi)) * self.df # this fixes offresonance!, only do this once # removed 04.12.2018 and fixed df offset in import data! # wdft = np.ones_like(data, dtype=np.float64) * 2. * np.pi # wdft *= self.df[:, np.newaxis] # wdft *= times[np.newaxis, :] rotreal = absolute * np.cos(phase - PHI) # - wdft) rotimag = absolute * np.sin(phase - PHI) # - wdft) if raw_data: self.data_raw = rotreal + 1j * rotimag self.raw_rotated = True else: self.data_gated = rotreal + 1j * rotimag self.rotated = True # raw data may be still unrotated
[docs] def filterGates(self, mint=0.0, maxt=2.0): """ Dismiss not desired time gates. Parameters ---------- mint: float [0.0] Cut all data reqired before mint (in seconds). This is done using the gate midpoints including deadtime. maxt: float [2.0] Cut all data reqired after maxt (in seconds). This is done using the gate midpoints including deadtime. Append new .gating to restore old gates raw_data remain untouched) """ # sort out nonsense gate times good = (self.gates <= maxt) & (self.gates >= mint) ph.log.log(13, 'filterGates: min t, max t: ({:.2f}, {:.2f})' .format(mint, maxt)) self.setGates(self._gates[good]) self.error_weights = self.error_weights[good] self.data_gated = self.data_gated[:, good] self.error_gated = self.error_gated[:, good] self._filter_last = [mint, maxt]
[docs] def save(self, savename): """ Saves FID class instance under savename. Expect savename with ending .npz (numpy compressed binary data structure). """ np.savez(savename, data_raw=self.data_raw, error_raw=self.error_raw, data_gated=self.data_gated, error_gated=self.error_gated, error_weights=self.error_weights, times=self._times, # without deadtime phi=self.phi, gates=self._gates, # without deadtime gatesthk=self._gates_thk, pulses=self.pulses, rotated=self.rotated, raw_rotated=self.raw_rotated, txturns=self.tx_turns, rxturns=self.rx_turns, txidx=self.tx_index, rxidx=self.rx_index, lastgates=self._gating_last, lastfilter=self._filter_last, temp=self.temperature, taup1=self.taup1, deadtime_d=self.deadtime_device, df=self.df, data_phase=self.data_phase)
[docs] def load(self, savename, df_removed=True): """ Load previously saved FID class instance from savename (.npz) (numpy compressed binary data structure). Usually imported data are cleansed from frequency offsets (df) before saving. However there is no auto detection for that. In rare cases (if you know what youre doing) data are saved without removing df first. Then df_removed has to be set to False. Otherwise the raw data """ from comet.pyhed.IO import getItem with np.load(savename, allow_pickle=True) as npz: # tx, rx txi = getItem(npz, 'txidx') txt = getItem(npz, 'txturns') rxi = getItem(npz, 'rxidx') rxt = getItem(npz, 'rxturns') self.setTx(txi, turns=txt) self.setRx(rxi, turns=rxt) # pulses, pulse duration and deadtime self.setPulses(getItem(npz, 'pulses')) taup = getItem(npz, 'taup1') dt_dev = getItem(npz, 'deadtime_d') self.setPulseDuration(taup, deadtime_device=dt_dev) # phases phi = getItem(npz, 'phi') self.data_phase = getItem(npz, 'data_phase', 0.0) # raw data, error, and times raw_data = getItem(npz, 'data_raw', None) raw_error = getItem(npz, 'error_raw', None) times = getItem(npz, 'times', None) rotated_raw = getItem(npz, 'raw_rotated') df = getItem(npz, 'df') if df_removed: self.setFrequencyOffset(0) else: self.setFrequencyOffset(df) if raw_data is not None: self.setRawDataErrorAndTimes( raw_data, raw_error, times, rotated=rotated_raw, phases=phi) if df_removed: self.setFrequencyOffset(df) # gated data, error, and gates self._gating_last = getItem(npz, 'lastgates') ph.log.debug('survey.load: last gates: {}' .format(self._gating_last)) self._filter_last = getItem(npz, 'lastfilter') ph.log.debug('survey.load: last filter: {}' .format(self._filter_last)) self.error_weights = getItem(npz, 'error_weights', None) data_gated = getItem(npz, 'data_gated', None) error_gated = getItem(npz, 'error_gated', None) gates = getItem(npz, 'gates', None) rotated = getItem(npz, 'rotated') if data_gated is None: if raw_data is not None: self._regating() else: self.setGatedDataErrorAndGates( data_gated, error_gated, gates, rotated=rotated, phases=phi) self._gates_thk = getItem(npz, 'gatesthk') if self.error_weights is None and data_gated is not None: ph.log.warning('supposed to have error_weights. ' 'Regating fails. To be fixed.')
# self._regating()
[docs]class Earth(object): """ Parameters ---------- inclination: float [ 60. ] Inclination of the earth magnetic field in rad or degree. declination: float [ 2. ] Declination of the earth magnetic field in rad or degree. magnitude: float [48000 * 1e-9] Magnitude of the earth magnetic field in Tesla. rad: boolean [ False ] Input inclination and declination in rad? Example ------- >>> from comet.snmr.survey import Earth >>> e = Earth(inclination=45, declination=0, magnitude=4.8*1e-5) >>> print(e) """ def __init__(self, incl=60., # 60° decl=2., # 2° mag=48000*1e-9, rad=False): self._larmor = None self._magnitude = None if rad: self.inclination = incl else: self.inclination = np.deg2rad(incl) if rad: self.declination = decl else: self.declination = np.deg2rad(decl) self.magnitude = mag def __repr__(self): return 'earth, incl: {:.1f}°, decl: {:.1f}°, mag: {:.0f} nT'.format( np.rad2deg(self.inclination), np.rad2deg(self.declination), self.magnitude*1e9) def __str__(self): return ('### {1} ###{0}' 'Inclination: {2:.3f} rad ({3:.2f}°){0}' 'Declination: {4:.3f} rad ({5:.2f}°){0}' 'Magnitude: {6:.0f}e-9 Tesla ({7:.2f} Hz)' .format( '\n', self.__class__.__name__.upper(), self.inclination, np.rad2deg(self.inclination), self.declination, np.rad2deg(self.declination), self.magnitude*1e9, self.larmor))
[docs] def copy(self): return Earth(inclination=self.inclination, declination=self.declination, magnitude=self.magnitude)
@property def larmor(self): return self._larmor @larmor.setter def larmor(self, larmor): self._larmor = larmor self._magnitude = self._Larmor2Mag() @property def magnitude(self): return self._magnitude @magnitude.setter def magnitude(self, magnitude): self._magnitude = magnitude self._larmor = self._Mag2Larmor() def _Mag2Larmor(self): return misc.constants.gamma / (2 * np.pi) * self.magnitude def _Larmor2Mag(self): return self.larmor / misc.constants.gamma * 2 * np.pi @property def field(self): """ Static magnetic field vector from earth defined in survey.""" incl = self.inclination decl = self.declination # Changed to B0-field definition after hertrich and braun, Nov, 2020 # original formulation: however their z points upwards! # decl = self.declination # B0 = np.array([np.cos(incl) * np.cos(-decl), # np.cos(incl) * np.sin(-decl), # np.sin(incl)]) # different definition for z: z negative downwards, which means: # to remain a left-handed coordinate system which means the sign of # the declination has to change as well B0 = np.array([np.cos(incl) * np.cos(decl), np.cos(incl) * np.sin(decl), -np.sin(incl)]) return B0[:, np.newaxis]
[docs]def createLoopFromMRS(looptype, length, xoff, segments=80, max_length=None, turns=1): """ Returns a loop class object out of input found in a mrsd or mrsk file. Parameters ---------- looptype: integer Integer in [1, 2, 3, 4], in this range representing circular, square, circular eight, and square eight loop source types. Error for looptype < 1 and > 4. length: Length [m] of one side of the loop, or loop diameter for cicular type. xoff: float Offset [m] for loop midpoint in positive x direction. segments: integer [ 80 ] Number of segments used for discretization of the loop wire. max_length: integer [ None ] If given, replaces the segments with a number suited to ensure each dipole represents this distance [m] at maximum. """ if looptype == 1: # case 1: circular loop = ph.loop.buildCircle( length/2., P=(xoff, 0, 0), num_segs=segments, max_length=max_length) elif looptype == 2: # case 1: square loop = ph.loop.buildSquare( length, P=(xoff, 0, 0), num_segs=segments, max_length=max_length) elif looptype == 3: # case 3: circular eight loop = ph.loop.buildFig8Circle( length/2., num_segs=segments, P=(xoff, 0, 0), max_length=max_length) elif looptype == 4: # case 4: square eight p1 = (-length + xoff, -length/2., 0) p2 = (length + xoff, length/2., 0) loop = ph.loop.buildFig8( (p1, p2), num_segs=segments, max_length=max_length) else: # if case is not recognized: None (user has to specify loop) print('Warning: Mhh, found found no recipie' ' on how to deal with looptype: {} not in (1, 2, 3, 4).' .format(looptype)) print('Set corresponding loop to None, continue...') loop = None if loop is not None: loop.turns = turns return loop
if __name__ == '__main__': __spec__ = None s = Survey() kern = s.loadMRSK('D://borkum/borkum_data/cliwat2.mrsk', x_offsets=0) # kern.calculate() s.fids[0].save('test_fid.npz') s2 = FID() s2.load('test_fid.npz') # The End