2

I have a somewhat expensive function to transform a big amount of data. Running this sequentially will take a lot of time, so I tried parallelizing it, but the result is all wrong. I want to change the elements of a numpy array using a parallelized function.

I've read Python map function, passing by reference/value?, but this solution is not applicable to the parralel version.

I've just been experiencing with Python for about a month or so, so maybe I'm asking something silly.

This is a simple example of what I'm trying to do.

import numpy as np 
import multiprocessing

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(i): 
    global globalData
    globalData[i] += 1


pool = multiprocessing.Pool(8)
globalData = pool.map(add, range(8))
pool.close()
pool.join()
print("Global data:", globalData)

I expected the ouput to be [2, 3, 4, 5, 6, 7, 8, 9], as it would be had I executed

for i in range(8):
    add(i)

but instead I get

[1, 2, 3, 4, 5, 6, 7, 8]

Thank you for any assistance.

Edit: This was my original problem, a not-so-minimum working example.

sample_size = 100

pca_sample = np.randon.rand(sample_size, sample_size)

def knl(x, y):
    #Just as an example
    return sin(x+y)

K_matrix = np.zeros((sample_size, sample_size))
for i in range(sample_size):
    for j in range(sample_size):
        K_matrix[i][j] = knl(pca_sample[i], pca_sample[j])

K_cent_matrix = np.zeros((sample_size, sample_size))

def K_centered(K_cent_matrix, i, j):
    term1 = K_matrix[i][j]
    term2 = 0.
    term3 = 0.
    term4 = 0.
    for k in range(sample_size):
        term2 += K_matrix[k][j]
    for k in range(sample_size):
        term3 += K_matrix[i][k]
    for k1 in range(sample_size):
        for k2 in range(sample_size):
            term4 += K_matrix[k1][k2]        
    term1 /= sample_size
    term2 /= sample_size
    term3 /= (sample_size * sample_size)
    K_cent_matrix[i][j] = term1 - term2 - term3 + term4 
    print(f"K_cent_matrix[{i:d}][{j:d}] = {K_cent_matrix[i][j]:f}")

pool = multiprocessing.Pool(8)
pool.starmap(K_centered, [(K_cent_matrix,i,j) for i, j in zip(range(sample_size), range(sample_size))])
pool.close()
pool.join() ```
lfmc
  • 31
  • 1
  • 7
  • 1
    I believe multiprocessing spawns new python processes which don't share memory. Not sharing memory means you don't have globals spanning across workers, everyone mutates their own. – Andras Deak -- Слава Україні Aug 17 '19 at 05:43
  • Before you start thinking about multiprocessing (and all the headeches that come with it) you should vectorize your code. This will give you much higher benefits! For example, your first for loop could become `term2 = np.sum(K_matrix[:sample_size, j])` – Samufi Aug 17 '19 at 06:30

3 Answers3

1

The issue is that globalData are not in shared memory. When this array is processed in parallel, a copy is created for each process and the original array remains unchanged. If you want to work on the same array in parallel, you would have to deal with shared memory, which is doable but not trivial either. See here and here.

From my own experience, I would advise you to return a copy of the results and "recreate" the result array rather than changing it in-place. Of course this may not be possible, if you are dealing with extreme amounts of data. However, otherwise, the gain in simplicity will outweigh the (small) gain in efficiency. Applied to your problem that could look as follows:

import numpy as np 
import multiprocessing

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(i): 
    return globalData[i] + 1

def exe():
    global globalData
    with multiprocessing.Pool(8) as pool:
        globalData = np.array(list(pool.map(add, range(8))))

    print("Global data:", globalData)

exe()

The result is

Global data: [2 3 4 5 6 7 8 9]

as desired.

The code will run much faster, if the chunksize argument is used. This will make the data communication betweeen your processes faster.

Note that the with statement saves you the work of joining your processes together after execution and stopping them. This does not work on the top-level code, though, which is why I put it into a method exe.

I have derived a helper class to make it easier to deal with shared arrays or large arrays without "really" sharing them.

With the code I provide at the end of my answer saved as "concurrent_futures_ext.py" in your working directory, you can write your code as

import numpy as np 
from concurrent_futures_ext import ProcessPoolExecutor

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(globalData, i): 
    globalData[i] += 1

def exe():
    global globalData
    shared_np_arrs = [globalData] # list of global arrays
    with ProcessPoolExecutor(8, shared_np_arrs=shared_np_arrs) as pool:
        any(pool.map(add, range(8)))
        globalData = pool.get_shared_arrays()[0] # retrieving the list of global arrays
    print("Global data:", globalData)

exe()

Only one copy of your data is necessary to put the array in shared memory.

Regarding your not minimal working example: huge optimizations are possible if you vectorize your code, i.e. use numpy functions instead of your for loops. Going through all the possible optimizations is beyond the scope of your question and my answer, but will give you code faster in orders of magnitude (much (!) better than what you can achieve with parallelization).

Here comes the code:

from concurrent.futures import ProcessPoolExecutor as conc_ProcessPoolExecutor
from concurrent.futures.process import _ExceptionWithTraceback, _get_chunks, _ResultItem
from functools import partial
import multiprocessing
import itertools
import os
import numpy as np
from multiprocessing import sharedctypes
CPU_COUNT = os.cpu_count() 


def get_cpu_chunk_counts(task_length, chunk_number=5, min_chunk_size=1):
    cpu_count = max(min(CPU_COUNT, 
                        task_length // (chunk_number*min_chunk_size)), 1)
    chunk_size = max(min_chunk_size, task_length // (cpu_count*chunk_number))
    return cpu_count, chunk_size

def _process_worker(call_queue, result_queue, const_args=[], shared_arrays=[]):
    """Evaluates calls from call_queue and places the results in result_queue.

    This worker is run in a separate process.

    Args:
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
            evaluated by the worker.
        result_queue: A multiprocessing.Queue of _ResultItems that will written
            to by the worker.
        shutdown: A multiprocessing.Event that will be set as a signal to the
            worker that it should exit when call_queue is empty.
    """

    shared_arrays_np = [np.ctypeslib.as_array(arr).view(dtype).reshape(shape) 
                        for arr, dtype, shape in shared_arrays]


    while True:
        call_item = call_queue.get(block=True)
        if call_item is None:
            result_queue.put(os.getpid())
            return
        try:
            r = call_item.fn(*call_item.args, const_args=const_args,
                             shared_arrays=shared_arrays_np,
                             **call_item.kwargs)
        except BaseException as e:
            exc = _ExceptionWithTraceback(e, e.__traceback__) 
            result_queue.put(_ResultItem(call_item.work_id, exception=exc))
        else:
            result_queue.put(_ResultItem(call_item.work_id,
                                         result=r))


def _process_chunk(fn, chunk, const_args, shared_arrays):
    """ Processes a chunk of an iterable passed to map.

    Runs the function passed to map() on a chunk of the
    iterable passed to map.

    This function is run in a separate process.

    """
    return [fn(*const_args, *shared_arrays, *args) for args in chunk]



class ProcessPoolExecutor(conc_ProcessPoolExecutor):
    '''
    classdocs 
    '''

    def __init__(self, max_workers=None, const_args=[], shared_np_arrs=[]):
        '''
        Constructor
        '''
        super().__init__(max_workers)
        self._const_args = const_args
        shared_arrays_ctype = []
        shared_arrays_np = []

        # TODO do not create copy of shared array, if it already has a suitable 
        # data structure
        for arr in shared_np_arrs:
            dtype = arr.dtype
            arrShared = np.empty(arr.size*dtype.itemsize, np.int8)
            arrShared = np.ctypeslib.as_ctypes(arrShared)
            ctypes_arr = sharedctypes.RawArray(arrShared._type_, arrShared)
            shared_arrays_ctype.append((ctypes_arr, arr.dtype, arr.shape))
            view = np.ctypeslib.as_array(ctypes_arr).view(arr.dtype).reshape(
                                                                    arr.shape)
            view[:] = arr
            shared_arrays_np.append(view)
        self._shared_arrays_np = shared_arrays_np
        self._shared_arrays = shared_arrays_ctype

    def _adjust_process_count(self):
        for _ in range(len(self._processes), self._max_workers):
            p = multiprocessing.Process(
                    target=_process_worker,
                    args=(self._call_queue,
                          self._result_queue,
                          self._const_args,
                          self._shared_arrays))
            p.start()
            self._processes[p.pid] = p    

    def map(self, fn, *iterables, timeout=None, chunksize=None, 
            tasklength=None, chunknumber=5, min_chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: If greater than one, the iterables will be chopped into
                chunks of size chunksize and submitted to the process pool.
                If set to one, the items in the list will be sent one at a time.
            tasklength: length of the iterable. If provided, the cpu count
                and the chunksize will be adjusted approprietly, if they are not
                explicietely given.
        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        tmp_max_workers = self._max_workers
        if tasklength and tasklength > 0:
            cpu_count, chunksize_tmp = get_cpu_chunk_counts(tasklength, 
                                                            chunknumber,
                                                            min_chunksize)
            if not chunksize:
                chunksize = chunksize_tmp
            self._max_workers = cpu_count

        if not chunksize:
            chunksize = 1

        if chunksize < 1:
            raise ValueError("chunksize must be >= 1.")

        results = super(conc_ProcessPoolExecutor, self).map(partial(_process_chunk, fn),
                              _get_chunks(*iterables, chunksize=chunksize),
                              timeout=timeout)

        self._max_workers = tmp_max_workers 

        return itertools.chain.from_iterable(results)


    def get_shared_arrays(self):
        return self._shared_arrays_np
Samufi
  • 2,465
  • 3
  • 19
  • 43
1

@Samufi is right globalData is not in shared memory, you can try:

1) by sharing the memory :

import numpy as np 
import multiprocessing
from multiprocessing import Array


globalData = Array('i' , np.array([1, 2, 3, 4, 5, 6, 7, 8]))

def add(i): 
    globalData[i] += 1


pool = multiprocessing.Pool(8)
pool.map(add, range(8))
print("Global data:", list(globalData))

# output: Global data: [2, 3, 4, 5, 6, 7, 8, 9]

2) your target function should return the processed item:

import numpy as np 
import multiprocessing

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(value_i): 
    return value_i + 1


pool = multiprocessing.Pool(8)
globalData = pool.map(add, globalData)
pool.close()
pool.join()
print("Global data:", globalData)

# output: Global data: [2, 3, 4, 5, 6, 7, 8, 9]

looking at your code it seems that you want to update a numpy array diagonal, you can try:

sample_size = 100

pca_sample = np.random.rand(sample_size, sample_size)

def knl(x, y):
    #Just as an example
    return sin(x+y)

K_matrix = np.zeros((sample_size, sample_size))
for i in range(sample_size):
    for j in range(sample_size):
#         print(pca_sample[i])
        K_matrix[i][j] = knl(pca_sample[i][j], pca_sample[i][j])

K_cent_matrix = np.zeros((sample_size, sample_size))

def K_centered(i, j):
    term1 = K_matrix[i][j]
    term2 = 0.
    term3 = 0.
    term4 = 0.
    for k in range(sample_size):
        term2 += K_matrix[k][j]
    for k in range(sample_size):
        term3 += K_matrix[i][k]
    for k1 in range(sample_size):
        for k2 in range(sample_size):
            term4 += K_matrix[k1][k2]        
    term1 /= sample_size
    term2 /= sample_size
    term3 /= (sample_size * sample_size)
    diag_update = term1 - term2 - term3 + term4 
    print(f"K_cent_matrix[{i:d}][{j:d}] = {diag_update:f}")
    return diag_update 


pool = multiprocessing.Pool(8)
K_cent_matrix[np.diag_indices_from(K_cent_matrix)]  = pool.starmap(K_centered, [(i, i) for i in range(sample_size)])
print(K_cent_matrix)

output:

[[7078.12324165    0.            0.         ...    0.
     0.            0.        ]
 [   0.         7078.0812738     0.         ...    0.
     0.            0.        ]
 [   0.            0.         7078.08619283 ...    0.
     0.            0.        ]
 ...
 [   0.            0.            0.         ... 7078.15205274
     0.            0.        ]
 [   0.            0.            0.         ...    0.
  7078.13850884    0.        ]
 [   0.            0.            0.         ...    0.
     0.         7078.1374349 ]]
kederrac
  • 16,819
  • 6
  • 32
  • 55
0

There is no need to further "optimize" numpy using multiprocessing for this, most numpy functions and methods already make use of parallelism if possible.

That said, that's not true for +1, but this should be a very fast operation regardless. Due to broadcasting it will be applied to all elements:

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])
newData = globalData + 1 # [2, 3, 4, 5, 6, 7, 8, 9]
Jan Christoph Terasa
  • 5,781
  • 24
  • 34