0

I know there are a lot of topics around similar problems (like How do I make processes able to write in an array of the main program?, Multiprocessing - Shared Array or Multiprocessing a loop of a function that writes to an array in python), but I just don't get it... so sorry for asking again.

I need to do some stuff with a huge array and want to speed up things by splitting it into blocks and running my function on those blocks, with each block being run in its own process. Problem is: the blocks are "cut" from one array and the result shall then be written into a new, common array. This is what I did so far (minimum working example; don't mind the array-shaping, this is necessary for my real-world case):

import numpy as np
import multiprocessing as mp

def calcArray(array, blocksize, n_cores=1):
    in_shape = (array.shape[0] * array.shape[1], array.shape[2])
    input_array = array[:, :, :array.shape[2]].reshape(in_shape)
    result_array = np.zeros(array.shape)
    # blockwise loop
    pix_count = array.size
    for position in range(0, pix_count, blocksize):
        if position + blocksize < array.shape[0] * array.shape[1]:
            num = blocksize
        else:
            num = pix_count - position
        result_part = input_array[position:position + num, :] * 2
        result_array[position:position + num] = result_part
    # finalize result
    final_result = result_array.reshape(array.shape)
    return final_result

if __name__ == '__main__':
    start = time.time()
    img = np.ones((4000, 4000, 4))
    result = calcArray(img, blocksize=100, n_cores=4)
    print 'Input:\n', img
    print '\nOutput:\n', result

How can I now implement multiprocessing in way that I set a number of cores and then calcArray assigns processes to each block until n_cores is reached?


With the much appreciated help of @Blownhither Ma, the code now looks like this:

import time, datetime
import numpy as np
from multiprocessing import Pool

def calculate(array):
    return array * 2

if __name__ == '__main__':
    start = time.time()
    CORES = 4
    BLOCKSIZE = 100
    ARRAY = np.ones((4000, 4000, 4))
    pool = Pool(processes=CORES)
    in_shape = (ARRAY.shape[0] * ARRAY.shape[1], ARRAY.shape[2])
    input_array = ARRAY[:, :, :ARRAY.shape[2]].reshape(in_shape)
    result_array = np.zeros(input_array.shape)
    # do it
    pix_count = ARRAY.size
    handles = []
    for position in range(0, pix_count, BLOCKSIZE):
        if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
            num = BLOCKSIZE
        else:
            num = pix_count - position
        ### OLD APPROACH WITH NO PARALLELIZATION ###
        # part = calculate(input_array[position:position + num, :])
        # result_array[position:position + num] = part
        ### NEW APPROACH WITH PARALLELIZATION ###
        handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :],))
        handles.append(handle)
    # finalize result
    ### OLD APPROACH WITH NO PARALLELIZATION ###
    # final_result = result_array.reshape(ARRAY.shape)
    ### NEW APPROACH WITH PARALLELIZATION ###
    final_result = [h.get() for h in handles]
    final_result = np.concatenate(final_result, axis=0)
    print 'Done!\nDuration (hh:mm:ss): {duration}'.format(duration=datetime.timedelta(seconds=time.time() - start))

The code runs and really starts the number processes I assigned, but takes much much longer than the old approach with just using the loop "as-is" (3 sec compared to 1 min). There must be something missing here.

s6hebern
  • 725
  • 9
  • 18

1 Answers1

1

The core function is pool.apply_async and handler.get.

I have been recently working on the same functions and find it useful to make a standard utility function. balanced_parallel applies function fn on matrix a in a parallel manner silently. assigned_parallel explicitly apply function on each element.
i. The way I split array is np.array_split. You may use block scheme instead.
ii. I use concat rather than assign to a empty matrix when collecting result. There is no shared memory.

from multiprocessing import cpu_count, Pool

def balanced_parallel(fn, a, processes=None, timeout=None):
    """ apply fn on slice of a, return concatenated result """
    if processes is None:
        processes = cpu_count()
    print('Parallel:\tstarting {} processes on input with shape {}'.format(processes, a.shape))
    results = assigned_parallel(fn, np.array_split(a, processes), timeout=timeout, verbose=False)
    return np.concatenate(results, 0)


def assigned_parallel(fn, l, processes=None, timeout=None, verbose=True):
    """ apply fn on each element of l, return list of results """
    if processes is None:
        processes = min(cpu_count(), len(l))
    pool = Pool(processes=processes)
    if verbose:
        print('Parallel:\tstarting {} processes on {} elements'.format(processes, len(l)))

    # add jobs to the pool
    handler = [pool.apply_async(fn, args=x if isinstance(x, tuple) else (x, )) for x in l]

    # pool running, join all results
    results = [handler[i].get(timeout=timeout) for i in range(len(handler))]

    pool.close()
    return results

In your case, fn would be

def _fn(matrix_part): return matrix_part * 2
result = balanced_parallel(_fn, img)

Follow-up: Your loop should look like this to make parallelization happen.

handles = []
for position in range(0, pix_count, BLOCKSIZE):
    if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
        num = BLOCKSIZE
    else:
        num = pix_count - position
    handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :], ))
    handles.append(handle)

# multiple handlers exist at this moment!! Don't `.get()` yet
results = [h.get() for h in handles]
results = np.concatenate(results, axis=0)
Blownhither Ma
  • 1,461
  • 8
  • 18
  • Well, at least it seems to work :) Now I have to understand it, give me some time to get my head around it, then I'll accept it. Thank you! – s6hebern May 04 '18 at 08:33
  • @s6hebern Sorry if it's lengthy :) I have encountered this multiple times so I decided to make a general solution. You may take only the essential code – Blownhither Ma May 04 '18 at 08:49
  • Given my edits, which are based on your answer, do you see anything I missed? To me, it looks like the code has no effect at all – s6hebern May 08 '18 at 06:11
  • @s6hebern you get result of a job before the next job is applied, thus forcing jobs to be done one by one. Simply apply all the jobs, then get all the results – Blownhither Ma May 08 '18 at 07:54
  • @s6hebern aka use one loop for apply, another for get – Blownhither Ma May 08 '18 at 07:56
  • I could not get my head around it, and I think my edited code above does not do what it is supposed to. Instead, I think it simply loops over the blocks and does the multi-processing, too, resulting in 4 times the runtime. Can you make your answer a bit clearer by adding my example? – s6hebern May 15 '18 at 06:50
  • @Blownhithee Ma thanks a lot. It runs, but I think it still does the whole job multiple times instead of once. See edited question. – s6hebern May 17 '18 at 06:13
  • @s6hebern When using your code on my Mac+Python3, I do see 4 child process running (plus 1 parent process) each with 25% CPU occupation – Blownhither Ma May 17 '18 at 06:20
  • @s6hebern I realize you are using `BLOCKSIZE = 100` which is too small for parallelization to be helpful. Every parallelization has overhead including context setup, input&output collection and thus having that many jobs is not a reasonable choice. For mathematical operation, I would recommend set the number of jobs to `CPU+1` or `CPU`. Besides, `4000*4000*4` matrix is too small to carry out an effective test. – Blownhither Ma May 17 '18 at 06:44
  • Thanks for clarification. I think I understood the main concept, although changing the array to `20000*15000*4` and `BLOCKSIZE=500` did not change anything. Maybe trying to do this specific operation in parallel is not a good idea anyway, but at least now I know how to use `Pool.apply_async` and `handle.get()`. Thank you :) – s6hebern May 17 '18 at 07:46