"""
Part of comet/pyhed/hed
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import print_function
import time
import sys
import os
if sys.version_info[:2] < (3, 3):
import Queue as queue # # python 2.7+ version
else:
import queue # python 3+ version
import multiprocessing as mpc
import numpy as np
import pygimli as pg
from comet.pyhed.misc import vec
[docs]def SummationWorker(queueIn, queueSum, queueEnd, verbose):
""" MPI Worker used to sum up single fields. """
Htes = queueIn.get()
queueIn.task_done()
looping = True
while looping is True:
try:
Htes += queueIn.get(timeout=0.3)
queueIn.task_done()
except queue.Empty:
time.sleep(1)
if queueEnd.empty() is False and queueIn.empty() is True:
queueSum.put(Htes, block=False)
looping = queueEnd.get()
queueEnd.task_done()
if verbose is True:
print('leave SumWorker', flush=True)
if verbose is True:
print('SumWorker hat das Gebäude verlassen', flush=True)
return
[docs]def InterpolationWorker(num, pos_queue, out_queue, data,
srcmeshName, outmeshName, outtype, verbose):
""" MPI Worker used to interpolate fields to target source location. """
if verbose is True:
print('Initialise Worker No %d.' % (num), flush=True)
# since pickling of pygimli mesh instances isn't possible, its nessesary to
# import the two meshes once per Worker...
if outtype == 'mesh':
outmesh = pg.load(outmeshName)
elif outtype == 'vector':
outmesh = outmeshName
srcmesh = pg.load(srcmeshName)
while True: # be careful this loop never ends!
pos = pos_queue.get()
if verbose is True:
print('Worker %d start interpolating dipole with pos (%f, %f, %f)'
% (num, pos[0], pos[1], pos[2]), flush=True)
alpha = (np.arctan2((pos[1]), (pos[0])) - np.pi/2.)
src = pg.Mesh(srcmesh) # need copy for rotate and translate
# rotate mesh
src.rotate(pg.Pos(0, 0, alpha))
src.translate(pg.Pos(pos[0], pos[1], pos[2]))
# rotate data !
data_rotated = vec.rotate3(data, alpha, copy=True)
Hte = vec.interpolateField(src, outmesh, data_rotated, verbose=verbose)
out_queue.put(Hte)
pos_queue.task_done()
sys.stdout.flush()
if pos_queue.empty() is True:
# print('sys.exit of Worker %d' % (num), flush=True)
# sys.exit()
if verbose is True:
print('exit of Worker %d' % (num), flush=True)
return
[docs]def multiInterpolation(DipoleDataName,
SrcMeshName,
OutMesh,
DipolePos=None,
verbose=False):
""" Call function for multiprocessing interpoaltion of dipole fields. """
tick = time.time()
# DipoleDataName = 'dipole_field.npy'
dipoledata = np.load(DipoleDataName)
# SrcMeshName = 'mesh/dipole.bms'
# OutMeshName = 'mesh/world.bms'
if isinstance(OutMesh, str):
outmesh = pg.load(OutMesh)
outtype = 'mesh'
if verbose is True:
print('search for dipole indices...', end='', flush=True)
dipole_pos = outmesh.positions(outmesh.findNodesIdxByMarker(-99))
dipole_pos = vec.R3VtoNumpy(dipole_pos)
N = np.shape(dipole_pos)[0]
if verbose is True:
print(' found %d dipoles... ' % (N), end='', flush=True)
else:
outmesh = OutMesh
outtype = 'vector'
dipole_pos = DipolePos
N = np.shape(dipole_pos)[0]
if verbose is True:
print('%d dipoles given ...' % (N), end='', flush=True)
pos_queue = mpc.JoinableQueue()
out_queue = mpc.JoinableQueue()
Sum = mpc.Queue()
end = mpc.JoinableQueue()
number_of_threads = min(os.cpu_count(), N)
if verbose is True:
print(' start interpolating in %d parallel processes.' %
(number_of_threads))
processes = []
for i in range(number_of_threads):
worker = mpc.Process(target=InterpolationWorker,
args=(i, pos_queue, out_queue,
dipoledata,
SrcMeshName,
OutMesh,
outtype,
verbose,))
worker.daemon = True
processes.append(worker)
worker.start()
SumWorker = mpc.Process(target=SummationWorker,
args=(out_queue,
Sum,
end,
verbose,))
SumWorker.daemon = True
SumWorker.start()
for pos in dipole_pos:
if verbose is True:
print(pos)
pos_queue.put(pos)
pos_queue.join()
out_queue.join()
end.put(False)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
Summe = Sum.get()
# print('Is SumWorker noch alive?', SumWorker.is_alive())
SumWorker.terminate()
verbose = True
if verbose is True:
print('overall time: %.4f seconds' % (time.time() - tick))
return Summe
# The End