"""
Part of comet/pyhed/misc
"""
# COMET - COupled Magnetic resonance Electrical resistivity Tomography
# Copyright (C) 2019 Nico Skibbe
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import sys
import queue # python 3+ version
import multiprocessing as mpc
import numpy as np
import pygimli as pg
from . vec import (getRSparseValues, R3VtoNumpy)
def _MatrixWorker(num, index, out_queue,
mesh_name, out_pos, verbose):
time_loadmesh = time.time()
mesh = pg.load(mesh_name)
if verbose is True:
print('time for meshimport ("%s"): %2.2f sec' %
(mesh_name, time.time() - time_loadmesh), flush=True)
print(out_pos.shape)
print(len(pg.core.PosVector(out_pos)))
mat = mesh.interpolationMatrix(pg.core.PosVector(out_pos))
print('worker: {}: done'.format(num))
print(mat.nCols())
print(mat.nRows())
print(mat.nVals())
out_queue.put([getRSparseValues(mat, getInCRS=False),
index])
print('worker: {}: pushed'.format(num))
mat.clear()
if verbose is True:
print('exit of Worker %d' % (num))
sys.stdout.flush()
return
[docs]def InterpolationMatrix_para(mesh_name, out_coords, maxCPUCount=12,
in_node_count=None, verbose=True):
""" Multiprocessing over outcoords. """
if in_node_count is None:
in_node_count = pg.Mesh(mesh_name).nodeCount()
out_queue = mpc.Queue()
number_of_processes = np.min([mpc.cpu_count(), maxCPUCount])
if isinstance(out_coords, pg.Mesh):
out_coords = R3VtoNumpy(out_coords.positions())
elif isinstance(out_coords, pg.core.PosVector):
out_coords = R3VtoNumpy(out_coords)
print(out_coords.shape)
splitted = np.array_split(out_coords, number_of_processes, axis=0)
out_node_count = len(out_coords)
if verbose is True:
print(' start calculating matrices in %d parallel processes.' %
(number_of_processes), flush=True)
processes = []
index_split = 0
for i in range(number_of_processes):
print(splitted[i].shape, index_split)
worker = mpc.Process(target=_MatrixWorker,
args=(i,
index_split,
out_queue,
mesh_name,
splitted[i],
verbose,))
index_split += len(splitted[i])
worker.daemon = True
processes.append(worker)
worker.start()
tick_assemble = time.time()
number = 0
rows = []
cols = []
vals = []
while number < number_of_processes:
try:
tick22 = time.time()
mat, split_index = out_queue.get()
except queue.Empty:
time.sleep(2.0)
continue
if verbose:
print('Assembling {:02d}/{:02d}: '.format(number + 1,
number_of_processes),
end='')
tick22 = time.time()
rows.extend(np.array(mat[0]) + split_index)
cols.extend(mat[1])
vals.extend(mat[2])
number += 1
tack22 = time.time() - tick22
if verbose:
print('{:2.2f} sec'.format(tack22))
print('Building up matrix with {} entries...'
.format(len(vals)))
interpolation_matrix = pg.core.SparseMapMatrix(rows, cols, vals)
print(interpolation_matrix.nVals())
interpolation_matrix.setRows(out_node_count)
interpolation_matrix.setCols(in_node_count)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
if verbose is True:
print('Calculation and Assembling of field_matrix: {:2.2f} seconds.'
.format(time.time() - tick_assemble))
return interpolation_matrix
# The End