"""
Part of comet/pyhed/loop
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import sys
import queue
import multiprocessing as mpc
import numpy as np
import pygimli as pg
# need explicit import, because import pyhed is not possible inside of pyhed
from comet.pyhed.misc import vec
from comet.pyhed.hed import makeField, calcField
from comet.pyhed import log
from comet import pyhed as ph
def _SummationWorker_perDipole(queueIn, queueSum, queueEnd):
Htes = queueIn.get()
queueIn.task_done()
Schleifen = True
while Schleifen is True:
try:
Htes += queueIn.get(timeout=0.3)
queueIn.task_done()
except queue.Empty:
time.sleep(1)
if queueEnd.empty() is False and queueIn.empty() is True:
queueSum.put(Htes, block=False)
Schleifen = queueEnd.get()
queueEnd.task_done()
log.log(13, 'leave SumWorker')
sys.stdout.flush()
log.debug('SumWorker has left the building')
sys.stdout.flush()
return
def _SummationWorker(queueIn, queueSum, queueEnd, field_shape, verbose):
looping = True
summed_field = np.ones(field_shape, dtype=np.complex128)
while looping is True:
try:
start_index, Htes = queueIn.get(timeout=0.3)
summed_field[0, start_index:start_index + len(Htes[0])] = Htes[0]
summed_field[1, start_index:start_index + len(Htes[0])] = Htes[1]
summed_field[2, start_index:start_index + len(Htes[0])] = Htes[2]
queueIn.task_done()
except queue.Empty:
time.sleep(1)
if queueEnd.empty() is False and queueIn.empty() is True:
queueSum.put(summed_field, block=False)
looping = queueEnd.get()
queueEnd.task_done()
log.log(13, 'SumWorker: Fields are ready. Cleaning up.')
sys.stdout.flush()
log.debug('SumWorker has left the building.')
sys.stdout.flush()
return
def _InterpolationWorker(num, in_queue, out_queue, data,
srcMeshName, outPos, verbose):
time_loadmesh = time.time()
srcMesh = pg.Mesh(srcMeshName)
if num == 0:
log.log(13, 'time for meshimport ("%s"): %2.2f sec' %
(srcMeshName, time.time() - time_loadmesh))
sys.stdout.flush()
while True:
(pos, alpha, length) = in_queue.get()
out = vec.translate(outPos, -pos[0], -pos[1], z=-pos[2], copy=True)
vec.rotate3(out, -alpha)
datarot = vec.rotate3(data, alpha, copy=True)
Hte = vec.interpolateField(srcMesh, out.T, datarot, verbose=False)
Hte *= length
out_queue.put(Hte)
in_queue.task_done()
sys.stdout.flush()
if in_queue.empty() is True:
log.log(13, f'exit of Worker {num}')
sys.stdout.flush()
return
[docs]def loopInterpolation(dipoledata,
SrcMeshName,
OutMesh,
PosPhiDs,
verbose=False,
cell_center=False,
num_cpu=12):
if isinstance(dipoledata, str):
dipoledata = np.load(dipoledata)
pos = PosPhiDs[0]
phi = PosPhiDs[1]
ds = PosPhiDs[2]
if not isinstance(OutMesh, np.ndarray):
if cell_center is False:
OutCoords = vec.R3VtoNumpy(OutMesh.positions()).T
elif cell_center is True:
if OutMesh.cellCount() == 0:
raise Exception(
'No Cells found in mesh: {!s}. '
'Maybe just coords and not a real mesh?'.format(OutMesh))
OutCoords = vec.R3VtoNumpy(OutMesh.cell_centers()).T
else:
OutCoords = OutMesh
N = len(pos)
log.log(16, f'Get {N} dipoles... ')
sys.stdout.flush()
pos_queue = mpc.JoinableQueue()
out_queue = mpc.JoinableQueue()
Sum = mpc.Queue()
end = mpc.JoinableQueue()
nop = np.min([mpc.cpu_count(), N, num_cpu])
log.info(f'Start interpolating in {nop} parallel processes.')
sys.stdout.flush()
processes = []
for i in range(nop):
worker = mpc.Process(target=_InterpolationWorker,
args=(i, pos_queue, out_queue,
dipoledata,
SrcMeshName,
OutCoords,
verbose,))
worker.daemon = True
processes.append(worker)
worker.start()
SumWorker = mpc.Process(target=_SummationWorker_perDipole,
args=(out_queue,
Sum,
end,))
SumWorker.daemon = True
SumWorker.start()
for i in range(N):
pos_queue.put([pos[i], phi[i], ds[i]])
pos_queue.join()
out_queue.join()
end.put(False)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
Summe = Sum.get() # final calculated field
SumWorker.terminate()
return Summe
[docs]def CalculationWorker_perDipole(num, in_queue, out_queue, rho, d, f, current,
mode, ftype, outPos, drop_tol):
# fc0, nc, nc0 = hankelfc(3)
# nu = np.arange(0, nc, 1, dtype=np.int) + 1
# n = nc0 - nc + nu
# q = 0.1 * np.log(10)
# hlp = np.exp(-(n - 1) * q)
if ftype == 'E':
fieldt = 'electric'
else:
fieldt = 'magnetic'
log.log(13, 'Worker {} calculates {} Field in {} mode.'
.format(num, fieldt, mode))
sys.stdout.flush()
while True:
(pos, alpha, lenght) = in_queue.get()
out = vec.translate(outPos, -pos[0], -pos[1], z=-pos[2], copy=True)
vec.rotate3(out, -alpha)
polar = vec.KtoP(out, drop_tol=drop_tol) # koordinate transformation
# u = hlp[:, np.newaxis] / polar[0]
F = calcField(polar, rho, d, f, current*lenght,
ftype, mode)
vec.rotate3(F, alpha)
out_queue.put(F)
in_queue.task_done()
if in_queue.empty() is True:
log.log(13, f'exit of Worker {num}')
sys.stdout.flush()
return
[docs]def CalculationWorker(num, index_start, pos_alpha_len, out_queue, end_queue,
rho, d, f, current, mode, ftype, outPos, verbose,
drop_tol, src_z, switch_hankel, log_level):
from comet import pyhed as ph
ph.log.setLevel(log_level)
# ph.params.useKerryKeyHankel(switch_hankel)
all_pos = pos_alpha_len[0]
all_alpha = pos_alpha_len[1]
all_len = pos_alpha_len[2]
Field = np.zeros_like(outPos, dtype=np.complex128)
num_d = len(all_pos) # number of dipoles (sources)
if sys.platform == 'linux' and num == 0 and log.getEffectiveLevel() <= 20:
iterable = ph.misc.progressBar(range(num_d), 'calc dipoles: ')
else:
iterable = range(num_d)
for i in iterable:
Field += makeField(outPos, rho, d, f=f,
Ids=current*all_len[i],
pos=all_pos[i],
angle=all_alpha[i], mode=mode,
inputType='V', ftype=ftype,
drop_tol=drop_tol,
src_z=src_z)
if num == 0:
ph.log.info('Waiting for other processes to finish.')
out_queue.put((index_start, Field))
end_queue.get()
end_queue.task_done()
return
[docs]def loopCalculation(OutMesh,
PosPhiDs,
rho, d, f, current, mode, ftype,
verbose=False,
cell_center=False,
num_cpu=12,
max_node_count=None,
**kwargs):
log.debug('loopCalculation kwargs: {}'.format(kwargs))
if max_node_count is None:
max_node_count = int(100000. / len(rho) / 2)
pos = PosPhiDs[0]
phi = PosPhiDs[1]
ds = PosPhiDs[2]
N = len(pos)
log.log(16, f'Loop gets {N} dipoles')
sys.stdout.flush()
out_queue = mpc.JoinableQueue()
Sum = mpc.Queue()
end = mpc.JoinableQueue()
end_worker = mpc.JoinableQueue()
# ready to handle numpy array with coords, and meshes
if not isinstance(OutMesh, pg.Mesh):
OutCoords = OutMesh
else:
if cell_center is False:
OutCoords = OutMesh.positions().array().T
elif cell_center is True:
if OutMesh.cellCount() == 0:
raise Exception(
'No Cells found in mesh: {!s}. '
'Maybe just coords and not a real mesh?'.format(OutMesh))
OutCoords = OutMesh.cell_centers().array().T
nop = np.min([mpc.cpu_count(), num_cpu])
num_nodes = len(OutCoords[0])
nodes_per_proc = float(num_nodes)/float(nop)
if nodes_per_proc > max_node_count:
main_split = (nodes_per_proc // max_node_count) + 1
else:
main_split = 1
OutCoords_main = np.array_split(OutCoords, main_split, axis=1)
# complete_field = np.zeros_like(OutCoords, dtype=np.complex128)
for main_i, main_arr in enumerate(OutCoords_main):
if nop > 1:
processes = []
start_index = 0
OutCoords_sub = np.array_split(main_arr, nop,
axis=1)
log.info(f'Start calculating in {nop} parallel processes.')
if ftype == 'E':
fieldt = 'electric'
else:
fieldt = 'magnetic'
log.log(16,
'Calculating {} Field for {} pos in {} mode for {} layer.'
.format(fieldt, np.shape(main_arr)[1], mode, len(rho)))
sys.stdout.flush()
for i, sub_arr in enumerate(OutCoords_sub):
end_worker.put(True)
worker = mpc.Process(target=CalculationWorker,
args=(i,
start_index,
PosPhiDs,
out_queue,
end_worker,
rho,
d,
f,
current,
mode,
ftype,
sub_arr,
verbose,
kwargs.get('drop_tol', 1e-2),
kwargs.get('src_z', -0.01),
True, # h.params.use_kk_hankel,
log.getEffectiveLevel(),
),)
worker.daemon = True
processes.append(worker)
worker.start()
start_index += len(sub_arr[0]) # (3, n), want to get n
SumWorker = mpc.Process(target=_SummationWorker,
args=(out_queue,
Sum, # queue
end,
np.shape(main_arr),
verbose,))
SumWorker.daemon = True
SumWorker.start()
end_worker.join() # waiting for worker
out_queue.join() # waiting for summation
end.put(False)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
Summe = Sum.get()
if main_i == 0:
complete_field = np.copy(Summe)
else:
complete_field = np.append(complete_field, Summe, axis=1)
SumWorker.terminate()
elif nop == 1:
log.info('Calculate directly on main core.')
sys.stdout.flush()
Summe = makeField(main_arr, rho, d, f=f, Ids=current*ds[0],
pos=pos[0], angle=phi[0], mode=mode,
inputType='V', ftype=ftype,
drop_tol=kwargs.get('drop_tol', 1e-2))
if N > 1:
for i in range(N - 1):
Summe += makeField(main_arr, rho, d, f=f,
Ids=current*ds[i + 1],
pos=pos[i + 1],
angle=phi[i + 1], mode=mode,
inputType='V', ftype=ftype,
drop_tol=kwargs.get('drop_tol', 1e-2))
if main_i == 0:
complete_field = np.copy(Summe)
else:
complete_field = np.append(complete_field, Summe, axis=1)
return complete_field
[docs]def loopCalculation_perDipole(OutMesh,
PosPhiDs,
rho, d, f, current, mode, ftype,
cell_center=False,
num_cpu=12,
**kwargs):
pos = PosPhiDs[0]
phi = PosPhiDs[1]
ds = PosPhiDs[2]
N = len(pos)
log.log(16, f'Loop gets {N} dipoles.')
sys.stdout.flush()
pos_queue = mpc.JoinableQueue()
out_queue = mpc.JoinableQueue()
Sum = mpc.Queue()
end = mpc.JoinableQueue()
# ready to handle numpy array with coords, and meshes
if not isinstance(OutMesh, pg.Mesh):
OutCoords = OutMesh
else:
if cell_center is False:
OutCoords = vec.R3VtoNumpy(OutMesh.positions()).T
elif cell_center is True:
OutCoords = vec.R3VtoNumpy(OutMesh.cell_centers()).T
nop = np.min([mpc.cpu_count(), N, num_cpu])
if nop > 1:
log.info(f'Start calculating in {nop} parallel processes.')
sys.stdout.flush()
processes = []
for i in range(nop):
worker = mpc.Process(target=CalculationWorker_perDipole,
args=(i, pos_queue, out_queue,
rho,
d,
f,
current,
mode,
ftype,
OutCoords,
kwargs.get('drop_tol', 1e-2)))
worker.daemon = True
processes.append(worker)
worker.start()
SumWorker = mpc.Process(target=_SummationWorker_perDipole,
args=(out_queue,
Sum,
end,))
SumWorker.daemon = True
SumWorker.start()
for i in range(N):
pos_queue.put([pos[i], phi[i], ds[i]])
pos_queue.join()
out_queue.join()
end.put(False)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
Summe = Sum.get()
SumWorker.terminate()
elif nop == 1:
log.info('Calculate directly on main core.')
sys.stdout.flush()
Summe = makeField(OutCoords, rho, d, f=f, Ids=current*ds[0],
pos=pos[0], angle=phi[0], mode=mode,
inputType='V', ftype=ftype,
drop_tol=kwargs.get('drop_tol', 1e-2))
if N > 1:
for i in range(N - 1):
Summe += makeField(OutCoords, rho, d, f=f,
Ids=current*ds[i + 1],
pos=pos[i + 1],
angle=phi[i + 1], mode=mode,
inputType='V', ftype=ftype,
drop_tol=kwargs.get('drop_tol', 1e-2))
return Summe
def _MatrixWorker(num, pos_phi_d, index, out_queue,
dipoleMeshName, lMeshPos, verbose):
time_loadmesh = time.time()
dipoleMesh = pg.Mesh(dipoleMeshName)
lNodeCount = len(lMeshPos)
dNodeCount = dipoleMesh.nodeCount()
if num == 0:
log.log(13, 'time for meshimport ({}): {:.2f} sec'
.format(dipoleMeshName, time.time() - time_loadmesh))
sys.stdout.flush()
mat = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount)
mat_sin = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount)
mat_cos = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount)
for i in range(len(pos_phi_d[0])):
pos = pos_phi_d[0][i]
alpha = pos_phi_d[1][i]
length = pos_phi_d[2][i]
actualPos = vec.translate(lMeshPos, -pos[0], -pos[1], z=-pos[2],
copy=True)
vec.rotate3(actualPos, -alpha)
basic_mat = dipoleMesh.interpolationMatrix(
pg.core.PosVector(actualPos)) * length
mat += basic_mat
mat_sin += basic_mat * np.sin(-alpha)
mat_cos += basic_mat * np.cos(-alpha)
basic_mat.clear()
out_queue.put([vec.getRSparseValues(mat, getInCRS=False),
vec.getRSparseValues(mat_sin, getInCRS=False),
vec.getRSparseValues(mat_cos, getInCRS=False),
index])
mat.clear()
mat_sin.clear()
mat_cos.clear()
log.log(13, f'exit of Worker {num}')
sys.stdout.flush()
return
[docs]def calcFieldMatrix_para(dipoleMeshName,
dipoleNodeCount,
loop_mesh,
PosPhiDs,
verbose=False,
num_cpu=12):
N = len(PosPhiDs[0])
log.log(16, f'Get {N} dipoles... ')
sys.stdout.flush()
out_queue = mpc.Queue()
nop = np.min([mpc.cpu_count(), N, num_cpu])
lMeshPos = vec.R3VtoNumpy(loop_mesh.positions())
splitted = np.array_split(lMeshPos, nop, axis=0)
lnodeCount = loop_mesh.nodeCount()
log.info(f'Start calculating matrices in {nop} parallel processes.')
sys.stdout.flush()
processes = []
index_split = 0
for i in range(nop):
worker = mpc.Process(target=_MatrixWorker,
args=(i,
PosPhiDs,
index_split,
out_queue,
dipoleMeshName,
splitted[i],
verbose,))
index_split += len(splitted[i])
worker.daemon = True
processes.append(worker)
worker.start()
tick_assemble = time.time()
number = 0
rows = []
rows_sin = []
rows_cos = []
cols = []
cols_sin = []
cols_cos = []
all_vals = []
all_sins = []
all_coss = []
while number < nop:
try:
tick22 = time.time()
m1, m2, m3, split = out_queue.get()
except queue.Empty:
time.sleep(2.0)
continue
log.log(13, 'Assembling {:02d}/{:02d}: '.format(number + 1, nop))
sys.stdout.flush()
tick22 = time.time()
rows.extend(np.array(m1[0]) + split)
cols.extend(m1[1])
all_vals.extend(m1[2])
rows_sin.extend(np.array(m2[0]) + split)
cols_sin.extend(m2[1])
all_sins.extend(m2[2])
rows_cos.extend(np.array(m3[0]) + split)
cols_cos.extend(m3[1])
all_coss.extend(m3[2])
number += 1
tack22 = time.time() - tick22
log.log(13, '{:2.2f} sec'.format(tack22))
sys.stdout.flush()
log.debug('Building up matrix 1/3 ... ')
sys.stdout.flush()
fieldMatrix = pg.core.SparseMapMatrix(pg.core.IndexArray(np.array(rows)),
pg.core.IndexArray(np.array(cols)),
pg.Vector(all_vals))
log.debug('done.')
sys.stdout.flush()
fieldMatrix.setRows(lnodeCount)
fieldMatrix.setCols(dipoleNodeCount)
log.debug('Building up matrix 2/3 ... ')
sys.stdout.flush()
fieldMatrix_sin = pg.core.SparseMapMatrix(
pg.core.IndexArray(np.array(rows_sin)),
pg.core.IndexArray(np.array(cols_sin)),
pg.Vector(all_sins))
log.debug('done.')
sys.stdout.flush()
fieldMatrix_sin.setRows(lnodeCount)
fieldMatrix_sin.setCols(dipoleNodeCount)
log.debug('Building up matrix 3/3 ... ')
sys.stdout.flush()
fieldMatrix_cos = pg.core.SparseMapMatrix(
pg.core.IndexArray(np.array(rows_cos)),
pg.core.IndexArray(np.array(cols_cos)),
pg.Vector(all_coss))
log.debug('done.')
sys.stdout.flush()
fieldMatrix_cos.setRows(lnodeCount)
fieldMatrix_cos.setCols(dipoleNodeCount)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
log.info('Calculation and Assembling of fieldMatrix: {:.2f} seconds.'
.format(time.time() - tick_assemble))
sys.stdout.flush()
return fieldMatrix, fieldMatrix_sin, fieldMatrix_cos
# The End