Source code for comet.pyhed.misc.para_lib

"""
Part of comet/pyhed/misc
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019  Nico Skibbe

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import time
import sys

import queue  # python 3+ version

import multiprocessing as mpc
import numpy as np
import pygimli as pg
from . vec import (getRSparseValues, R3VtoNumpy)


def _MatrixWorker(num, index, out_queue,
                  mesh_name, out_pos, verbose):
    time_loadmesh = time.time()
    mesh = pg.load(mesh_name)
    if verbose is True:
        print('time for meshimport ("%s"): %2.2f sec' %
              (mesh_name, time.time() - time_loadmesh), flush=True)
    print(out_pos.shape)
    print(len(pg.core.PosVector(out_pos)))
    mat = mesh.interpolationMatrix(pg.core.PosVector(out_pos))
    print('worker: {}: done'.format(num))
    print(mat.nCols())
    print(mat.nRows())
    print(mat.nVals())
    out_queue.put([getRSparseValues(mat, getInCRS=False),
                   index])
    print('worker: {}: pushed'.format(num))
    mat.clear()
    if verbose is True:
        print('exit of Worker %d' % (num))
    sys.stdout.flush()
    return


[docs]def InterpolationMatrix_para(mesh_name, out_coords, maxCPUCount=12, in_node_count=None, verbose=True): """ Multiprocessing over outcoords. """ if in_node_count is None: in_node_count = pg.Mesh(mesh_name).nodeCount() out_queue = mpc.Queue() number_of_processes = np.min([mpc.cpu_count(), maxCPUCount]) if isinstance(out_coords, pg.Mesh): out_coords = R3VtoNumpy(out_coords.positions()) elif isinstance(out_coords, pg.core.PosVector): out_coords = R3VtoNumpy(out_coords) print(out_coords.shape) splitted = np.array_split(out_coords, number_of_processes, axis=0) out_node_count = len(out_coords) if verbose is True: print(' start calculating matrices in %d parallel processes.' % (number_of_processes), flush=True) processes = [] index_split = 0 for i in range(number_of_processes): print(splitted[i].shape, index_split) worker = mpc.Process(target=_MatrixWorker, args=(i, index_split, out_queue, mesh_name, splitted[i], verbose,)) index_split += len(splitted[i]) worker.daemon = True processes.append(worker) worker.start() tick_assemble = time.time() number = 0 rows = [] cols = [] vals = [] while number < number_of_processes: try: tick22 = time.time() mat, split_index = out_queue.get() except queue.Empty: time.sleep(2.0) continue if verbose: print('Assembling {:02d}/{:02d}: '.format(number + 1, number_of_processes), end='') tick22 = time.time() rows.extend(np.array(mat[0]) + split_index) cols.extend(mat[1]) vals.extend(mat[2]) number += 1 tack22 = time.time() - tick22 if verbose: print('{:2.2f} sec'.format(tack22)) print('Building up matrix with {} entries...' .format(len(vals))) interpolation_matrix = pg.core.SparseMapMatrix(rows, cols, vals) print(interpolation_matrix.nVals()) interpolation_matrix.setRows(out_node_count) interpolation_matrix.setCols(in_node_count) for p in processes: if p.is_alive(): p.terminate() p.join() if verbose is True: print('Calculation and Assembling of field_matrix: {:2.2f} seconds.' .format(time.time() - tick_assemble)) return interpolation_matrix
# The End