Source code for comet.pyhed.hed.hed_para

"""
Part of comet/pyhed/hed
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import print_function
import time
import sys
import os

if sys.version_info[:2] < (3, 3):
    import Queue as queue  # # python 2.7+ version
else:
    import queue  # python 3+ version

import multiprocessing as mpc
import numpy as np
import pygimli as pg

from comet.pyhed.misc import vec


[docs]def SummationWorker(queueIn, queueSum, queueEnd, verbose): """ MPI Worker used to sum up single fields. """ Htes = queueIn.get() queueIn.task_done() looping = True while looping is True: try: Htes += queueIn.get(timeout=0.3) queueIn.task_done() except queue.Empty: time.sleep(1) if queueEnd.empty() is False and queueIn.empty() is True: queueSum.put(Htes, block=False) looping = queueEnd.get() queueEnd.task_done() if verbose is True: print('leave SumWorker', flush=True) if verbose is True: print('SumWorker hat das Gebäude verlassen', flush=True) return
[docs]def InterpolationWorker(num, pos_queue, out_queue, data, srcmeshName, outmeshName, outtype, verbose): """ MPI Worker used to interpolate fields to target source location. """ if verbose is True: print('Initialise Worker No %d.' % (num), flush=True) # since pickling of pygimli mesh instances isn't possible, its nessesary to # import the two meshes once per Worker... if outtype == 'mesh': outmesh = pg.load(outmeshName) elif outtype == 'vector': outmesh = outmeshName srcmesh = pg.load(srcmeshName) while True: # be careful this loop never ends! pos = pos_queue.get() if verbose is True: print('Worker %d start interpolating dipole with pos (%f, %f, %f)' % (num, pos[0], pos[1], pos[2]), flush=True) alpha = (np.arctan2((pos[1]), (pos[0])) - np.pi/2.) src = pg.Mesh(srcmesh) # need copy for rotate and translate # rotate mesh src.rotate(pg.Pos(0, 0, alpha)) src.translate(pg.Pos(pos[0], pos[1], pos[2])) # rotate data ! data_rotated = vec.rotate3(data, alpha, copy=True) Hte = vec.interpolateField(src, outmesh, data_rotated, verbose=verbose) out_queue.put(Hte) pos_queue.task_done() sys.stdout.flush() if pos_queue.empty() is True: # print('sys.exit of Worker %d' % (num), flush=True) # sys.exit() if verbose is True: print('exit of Worker %d' % (num), flush=True) return
[docs]def multiInterpolation(DipoleDataName, SrcMeshName, OutMesh, DipolePos=None, verbose=False): """ Call function for multiprocessing interpoaltion of dipole fields. """ tick = time.time() # DipoleDataName = 'dipole_field.npy' dipoledata = np.load(DipoleDataName) # SrcMeshName = 'mesh/dipole.bms' # OutMeshName = 'mesh/world.bms' if isinstance(OutMesh, str): outmesh = pg.load(OutMesh) outtype = 'mesh' if verbose is True: print('search for dipole indices...', end='', flush=True) dipole_pos = outmesh.positions(outmesh.findNodesIdxByMarker(-99)) dipole_pos = vec.R3VtoNumpy(dipole_pos) N = np.shape(dipole_pos)[0] if verbose is True: print(' found %d dipoles... ' % (N), end='', flush=True) else: outmesh = OutMesh outtype = 'vector' dipole_pos = DipolePos N = np.shape(dipole_pos)[0] if verbose is True: print('%d dipoles given ...' % (N), end='', flush=True) pos_queue = mpc.JoinableQueue() out_queue = mpc.JoinableQueue() Sum = mpc.Queue() end = mpc.JoinableQueue() number_of_threads = min(os.cpu_count(), N) if verbose is True: print(' start interpolating in %d parallel processes.' % (number_of_threads)) processes = [] for i in range(number_of_threads): worker = mpc.Process(target=InterpolationWorker, args=(i, pos_queue, out_queue, dipoledata, SrcMeshName, OutMesh, outtype, verbose,)) worker.daemon = True processes.append(worker) worker.start() SumWorker = mpc.Process(target=SummationWorker, args=(out_queue, Sum, end, verbose,)) SumWorker.daemon = True SumWorker.start() for pos in dipole_pos: if verbose is True: print(pos) pos_queue.put(pos) pos_queue.join() out_queue.join() end.put(False) for p in processes: if p.is_alive(): p.terminate() p.join() Summe = Sum.get() # print('Is SumWorker noch alive?', SumWorker.is_alive()) SumWorker.terminate() verbose = True if verbose is True: print('overall time: %.4f seconds' % (time.time() - tick)) return Summe
# The End