Source code for comet.pyhed.loop.loop_para

"""
Part of comet/pyhed/loop
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import time
import sys
import queue

import multiprocessing as mpc
import numpy as np
import pygimli as pg

# need explicit import, because import pyhed is not possible inside of pyhed
from comet.pyhed.misc import vec
from comet.pyhed.hed import makeField, calcField
from comet.pyhed import log
from comet import pyhed as ph


def _SummationWorker_perDipole(queueIn, queueSum, queueEnd):
    Htes = queueIn.get()
    queueIn.task_done()
    Schleifen = True
    while Schleifen is True:
        try:
            Htes += queueIn.get(timeout=0.3)
            queueIn.task_done()

        except queue.Empty:
            time.sleep(1)

        if queueEnd.empty() is False and queueIn.empty() is True:
            queueSum.put(Htes, block=False)
            Schleifen = queueEnd.get()
            queueEnd.task_done()
            log.log(13, 'leave SumWorker')
            sys.stdout.flush()

    log.debug('SumWorker has left the building')
    sys.stdout.flush()
    return


def _SummationWorker(queueIn, queueSum, queueEnd, field_shape, verbose):
    looping = True
    summed_field = np.ones(field_shape, dtype=np.complex128)
    while looping is True:
        try:
            start_index, Htes = queueIn.get(timeout=0.3)
            summed_field[0, start_index:start_index + len(Htes[0])] = Htes[0]
            summed_field[1, start_index:start_index + len(Htes[0])] = Htes[1]
            summed_field[2, start_index:start_index + len(Htes[0])] = Htes[2]
            queueIn.task_done()

        except queue.Empty:
            time.sleep(1)

        if queueEnd.empty() is False and queueIn.empty() is True:
            queueSum.put(summed_field, block=False)
            looping = queueEnd.get()
            queueEnd.task_done()
            log.log(13, 'SumWorker: Fields are ready. Cleaning up.')
            sys.stdout.flush()

    log.debug('SumWorker has left the building.')
    sys.stdout.flush()
    return


def _InterpolationWorker(num, in_queue, out_queue, data,
                         srcMeshName, outPos, verbose):
    time_loadmesh = time.time()
    srcMesh = pg.Mesh(srcMeshName)
    if num == 0:
        log.log(13, 'time for meshimport ("%s"): %2.2f sec' %
                (srcMeshName, time.time() - time_loadmesh))
        sys.stdout.flush()

    while True:
        (pos, alpha, length) = in_queue.get()

        out = vec.translate(outPos, -pos[0], -pos[1], z=-pos[2], copy=True)
        vec.rotate3(out, -alpha)
        datarot = vec.rotate3(data, alpha, copy=True)
        Hte = vec.interpolateField(srcMesh, out.T, datarot, verbose=False)
        Hte *= length
        out_queue.put(Hte)
        in_queue.task_done()
        sys.stdout.flush()
        if in_queue.empty() is True:
            log.log(13, f'exit of Worker {num}')
            sys.stdout.flush()
            return


[docs]def loopInterpolation(dipoledata, SrcMeshName, OutMesh, PosPhiDs, verbose=False, cell_center=False, num_cpu=12): if isinstance(dipoledata, str): dipoledata = np.load(dipoledata) pos = PosPhiDs[0] phi = PosPhiDs[1] ds = PosPhiDs[2] if not isinstance(OutMesh, np.ndarray): if cell_center is False: OutCoords = vec.R3VtoNumpy(OutMesh.positions()).T elif cell_center is True: if OutMesh.cellCount() == 0: raise Exception( 'No Cells found in mesh: {!s}. ' 'Maybe just coords and not a real mesh?'.format(OutMesh)) OutCoords = vec.R3VtoNumpy(OutMesh.cell_centers()).T else: OutCoords = OutMesh N = len(pos) log.log(16, f'Get {N} dipoles... ') sys.stdout.flush() pos_queue = mpc.JoinableQueue() out_queue = mpc.JoinableQueue() Sum = mpc.Queue() end = mpc.JoinableQueue() nop = np.min([mpc.cpu_count(), N, num_cpu]) log.info(f'Start interpolating in {nop} parallel processes.') sys.stdout.flush() processes = [] for i in range(nop): worker = mpc.Process(target=_InterpolationWorker, args=(i, pos_queue, out_queue, dipoledata, SrcMeshName, OutCoords, verbose,)) worker.daemon = True processes.append(worker) worker.start() SumWorker = mpc.Process(target=_SummationWorker_perDipole, args=(out_queue, Sum, end,)) SumWorker.daemon = True SumWorker.start() for i in range(N): pos_queue.put([pos[i], phi[i], ds[i]]) pos_queue.join() out_queue.join() end.put(False) for p in processes: if p.is_alive(): p.terminate() p.join() Summe = Sum.get() # final calculated field SumWorker.terminate() return Summe
[docs]def CalculationWorker_perDipole(num, in_queue, out_queue, rho, d, f, current, mode, ftype, outPos, drop_tol): # fc0, nc, nc0 = hankelfc(3) # nu = np.arange(0, nc, 1, dtype=np.int) + 1 # n = nc0 - nc + nu # q = 0.1 * np.log(10) # hlp = np.exp(-(n - 1) * q) if ftype == 'E': fieldt = 'electric' else: fieldt = 'magnetic' log.log(13, 'Worker {} calculates {} Field in {} mode.' .format(num, fieldt, mode)) sys.stdout.flush() while True: (pos, alpha, lenght) = in_queue.get() out = vec.translate(outPos, -pos[0], -pos[1], z=-pos[2], copy=True) vec.rotate3(out, -alpha) polar = vec.KtoP(out, drop_tol=drop_tol) # koordinate transformation # u = hlp[:, np.newaxis] / polar[0] F = calcField(polar, rho, d, f, current*lenght, ftype, mode) vec.rotate3(F, alpha) out_queue.put(F) in_queue.task_done() if in_queue.empty() is True: log.log(13, f'exit of Worker {num}') sys.stdout.flush() return
[docs]def CalculationWorker(num, index_start, pos_alpha_len, out_queue, end_queue, rho, d, f, current, mode, ftype, outPos, verbose, drop_tol, src_z, switch_hankel, log_level): from comet import pyhed as ph ph.log.setLevel(log_level) # ph.params.useKerryKeyHankel(switch_hankel) all_pos = pos_alpha_len[0] all_alpha = pos_alpha_len[1] all_len = pos_alpha_len[2] Field = np.zeros_like(outPos, dtype=np.complex128) num_d = len(all_pos) # number of dipoles (sources) if sys.platform == 'linux' and num == 0 and log.getEffectiveLevel() <= 20: iterable = ph.misc.progressBar(range(num_d), 'calc dipoles: ') else: iterable = range(num_d) for i in iterable: Field += makeField(outPos, rho, d, f=f, Ids=current*all_len[i], pos=all_pos[i], angle=all_alpha[i], mode=mode, inputType='V', ftype=ftype, drop_tol=drop_tol, src_z=src_z) if num == 0: ph.log.info('Waiting for other processes to finish.') out_queue.put((index_start, Field)) end_queue.get() end_queue.task_done() return
[docs]def loopCalculation(OutMesh, PosPhiDs, rho, d, f, current, mode, ftype, verbose=False, cell_center=False, num_cpu=12, max_node_count=None, **kwargs): log.debug('loopCalculation kwargs: {}'.format(kwargs)) if max_node_count is None: max_node_count = int(100000. / len(rho) / 2) pos = PosPhiDs[0] phi = PosPhiDs[1] ds = PosPhiDs[2] N = len(pos) log.log(16, f'Loop gets {N} dipoles') sys.stdout.flush() out_queue = mpc.JoinableQueue() Sum = mpc.Queue() end = mpc.JoinableQueue() end_worker = mpc.JoinableQueue() # ready to handle numpy array with coords, and meshes if not isinstance(OutMesh, pg.Mesh): OutCoords = OutMesh else: if cell_center is False: OutCoords = OutMesh.positions().array().T elif cell_center is True: if OutMesh.cellCount() == 0: raise Exception( 'No Cells found in mesh: {!s}. ' 'Maybe just coords and not a real mesh?'.format(OutMesh)) OutCoords = OutMesh.cell_centers().array().T nop = np.min([mpc.cpu_count(), num_cpu]) num_nodes = len(OutCoords[0]) nodes_per_proc = float(num_nodes)/float(nop) if nodes_per_proc > max_node_count: main_split = (nodes_per_proc // max_node_count) + 1 else: main_split = 1 OutCoords_main = np.array_split(OutCoords, main_split, axis=1) # complete_field = np.zeros_like(OutCoords, dtype=np.complex128) for main_i, main_arr in enumerate(OutCoords_main): if nop > 1: processes = [] start_index = 0 OutCoords_sub = np.array_split(main_arr, nop, axis=1) log.info(f'Start calculating in {nop} parallel processes.') if ftype == 'E': fieldt = 'electric' else: fieldt = 'magnetic' log.log(16, 'Calculating {} Field for {} pos in {} mode for {} layer.' .format(fieldt, np.shape(main_arr)[1], mode, len(rho))) sys.stdout.flush() for i, sub_arr in enumerate(OutCoords_sub): end_worker.put(True) worker = mpc.Process(target=CalculationWorker, args=(i, start_index, PosPhiDs, out_queue, end_worker, rho, d, f, current, mode, ftype, sub_arr, verbose, kwargs.get('drop_tol', 1e-2), kwargs.get('src_z', -0.01), True, # h.params.use_kk_hankel, log.getEffectiveLevel(), ),) worker.daemon = True processes.append(worker) worker.start() start_index += len(sub_arr[0]) # (3, n), want to get n SumWorker = mpc.Process(target=_SummationWorker, args=(out_queue, Sum, # queue end, np.shape(main_arr), verbose,)) SumWorker.daemon = True SumWorker.start() end_worker.join() # waiting for worker out_queue.join() # waiting for summation end.put(False) for p in processes: if p.is_alive(): p.terminate() p.join() Summe = Sum.get() if main_i == 0: complete_field = np.copy(Summe) else: complete_field = np.append(complete_field, Summe, axis=1) SumWorker.terminate() elif nop == 1: log.info('Calculate directly on main core.') sys.stdout.flush() Summe = makeField(main_arr, rho, d, f=f, Ids=current*ds[0], pos=pos[0], angle=phi[0], mode=mode, inputType='V', ftype=ftype, drop_tol=kwargs.get('drop_tol', 1e-2)) if N > 1: for i in range(N - 1): Summe += makeField(main_arr, rho, d, f=f, Ids=current*ds[i + 1], pos=pos[i + 1], angle=phi[i + 1], mode=mode, inputType='V', ftype=ftype, drop_tol=kwargs.get('drop_tol', 1e-2)) if main_i == 0: complete_field = np.copy(Summe) else: complete_field = np.append(complete_field, Summe, axis=1) return complete_field
[docs]def loopCalculation_perDipole(OutMesh, PosPhiDs, rho, d, f, current, mode, ftype, cell_center=False, num_cpu=12, **kwargs): pos = PosPhiDs[0] phi = PosPhiDs[1] ds = PosPhiDs[2] N = len(pos) log.log(16, f'Loop gets {N} dipoles.') sys.stdout.flush() pos_queue = mpc.JoinableQueue() out_queue = mpc.JoinableQueue() Sum = mpc.Queue() end = mpc.JoinableQueue() # ready to handle numpy array with coords, and meshes if not isinstance(OutMesh, pg.Mesh): OutCoords = OutMesh else: if cell_center is False: OutCoords = vec.R3VtoNumpy(OutMesh.positions()).T elif cell_center is True: OutCoords = vec.R3VtoNumpy(OutMesh.cell_centers()).T nop = np.min([mpc.cpu_count(), N, num_cpu]) if nop > 1: log.info(f'Start calculating in {nop} parallel processes.') sys.stdout.flush() processes = [] for i in range(nop): worker = mpc.Process(target=CalculationWorker_perDipole, args=(i, pos_queue, out_queue, rho, d, f, current, mode, ftype, OutCoords, kwargs.get('drop_tol', 1e-2))) worker.daemon = True processes.append(worker) worker.start() SumWorker = mpc.Process(target=_SummationWorker_perDipole, args=(out_queue, Sum, end,)) SumWorker.daemon = True SumWorker.start() for i in range(N): pos_queue.put([pos[i], phi[i], ds[i]]) pos_queue.join() out_queue.join() end.put(False) for p in processes: if p.is_alive(): p.terminate() p.join() Summe = Sum.get() SumWorker.terminate() elif nop == 1: log.info('Calculate directly on main core.') sys.stdout.flush() Summe = makeField(OutCoords, rho, d, f=f, Ids=current*ds[0], pos=pos[0], angle=phi[0], mode=mode, inputType='V', ftype=ftype, drop_tol=kwargs.get('drop_tol', 1e-2)) if N > 1: for i in range(N - 1): Summe += makeField(OutCoords, rho, d, f=f, Ids=current*ds[i + 1], pos=pos[i + 1], angle=phi[i + 1], mode=mode, inputType='V', ftype=ftype, drop_tol=kwargs.get('drop_tol', 1e-2)) return Summe
def _MatrixWorker(num, pos_phi_d, index, out_queue, dipoleMeshName, lMeshPos, verbose): time_loadmesh = time.time() dipoleMesh = pg.Mesh(dipoleMeshName) lNodeCount = len(lMeshPos) dNodeCount = dipoleMesh.nodeCount() if num == 0: log.log(13, 'time for meshimport ({}): {:.2f} sec' .format(dipoleMeshName, time.time() - time_loadmesh)) sys.stdout.flush() mat = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount) mat_sin = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount) mat_cos = pg.core.SparseMapMatrix(r=lNodeCount, c=dNodeCount) for i in range(len(pos_phi_d[0])): pos = pos_phi_d[0][i] alpha = pos_phi_d[1][i] length = pos_phi_d[2][i] actualPos = vec.translate(lMeshPos, -pos[0], -pos[1], z=-pos[2], copy=True) vec.rotate3(actualPos, -alpha) basic_mat = dipoleMesh.interpolationMatrix( pg.core.PosVector(actualPos)) * length mat += basic_mat mat_sin += basic_mat * np.sin(-alpha) mat_cos += basic_mat * np.cos(-alpha) basic_mat.clear() out_queue.put([vec.getRSparseValues(mat, getInCRS=False), vec.getRSparseValues(mat_sin, getInCRS=False), vec.getRSparseValues(mat_cos, getInCRS=False), index]) mat.clear() mat_sin.clear() mat_cos.clear() log.log(13, f'exit of Worker {num}') sys.stdout.flush() return
[docs]def calcFieldMatrix_para(dipoleMeshName, dipoleNodeCount, loop_mesh, PosPhiDs, verbose=False, num_cpu=12): N = len(PosPhiDs[0]) log.log(16, f'Get {N} dipoles... ') sys.stdout.flush() out_queue = mpc.Queue() nop = np.min([mpc.cpu_count(), N, num_cpu]) lMeshPos = vec.R3VtoNumpy(loop_mesh.positions()) splitted = np.array_split(lMeshPos, nop, axis=0) lnodeCount = loop_mesh.nodeCount() log.info(f'Start calculating matrices in {nop} parallel processes.') sys.stdout.flush() processes = [] index_split = 0 for i in range(nop): worker = mpc.Process(target=_MatrixWorker, args=(i, PosPhiDs, index_split, out_queue, dipoleMeshName, splitted[i], verbose,)) index_split += len(splitted[i]) worker.daemon = True processes.append(worker) worker.start() tick_assemble = time.time() number = 0 rows = [] rows_sin = [] rows_cos = [] cols = [] cols_sin = [] cols_cos = [] all_vals = [] all_sins = [] all_coss = [] while number < nop: try: tick22 = time.time() m1, m2, m3, split = out_queue.get() except queue.Empty: time.sleep(2.0) continue log.log(13, 'Assembling {:02d}/{:02d}: '.format(number + 1, nop)) sys.stdout.flush() tick22 = time.time() rows.extend(np.array(m1[0]) + split) cols.extend(m1[1]) all_vals.extend(m1[2]) rows_sin.extend(np.array(m2[0]) + split) cols_sin.extend(m2[1]) all_sins.extend(m2[2]) rows_cos.extend(np.array(m3[0]) + split) cols_cos.extend(m3[1]) all_coss.extend(m3[2]) number += 1 tack22 = time.time() - tick22 log.log(13, '{:2.2f} sec'.format(tack22)) sys.stdout.flush() log.debug('Building up matrix 1/3 ... ') sys.stdout.flush() fieldMatrix = pg.core.SparseMapMatrix(pg.core.IndexArray(np.array(rows)), pg.core.IndexArray(np.array(cols)), pg.Vector(all_vals)) log.debug('done.') sys.stdout.flush() fieldMatrix.setRows(lnodeCount) fieldMatrix.setCols(dipoleNodeCount) log.debug('Building up matrix 2/3 ... ') sys.stdout.flush() fieldMatrix_sin = pg.core.SparseMapMatrix( pg.core.IndexArray(np.array(rows_sin)), pg.core.IndexArray(np.array(cols_sin)), pg.Vector(all_sins)) log.debug('done.') sys.stdout.flush() fieldMatrix_sin.setRows(lnodeCount) fieldMatrix_sin.setCols(dipoleNodeCount) log.debug('Building up matrix 3/3 ... ') sys.stdout.flush() fieldMatrix_cos = pg.core.SparseMapMatrix( pg.core.IndexArray(np.array(rows_cos)), pg.core.IndexArray(np.array(cols_cos)), pg.Vector(all_coss)) log.debug('done.') sys.stdout.flush() fieldMatrix_cos.setRows(lnodeCount) fieldMatrix_cos.setCols(dipoleNodeCount) for p in processes: if p.is_alive(): p.terminate() p.join() log.info('Calculation and Assembling of fieldMatrix: {:.2f} seconds.' .format(time.time() - tick_assemble)) sys.stdout.flush() return fieldMatrix, fieldMatrix_sin, fieldMatrix_cos
# The End