1

I process 2 large 1D arrays (lets say A and B): I operate over pairs of A and B elements and write a results to a shared array C (think of C as a histogram). I want to use multiprocessing to parallelize the process. I thought the optimal approach could be slicing array A in a number of unique chunks equal to the number of parallel processes I choose to execute on, and using for loop to do math against all elements of B.

I was reading many questions/answers. I looked at Multiprocessing a loop of a function that writes to an array in python as an example which uses Process. I tried to adapt to my problem, but I'm getting performance of a serial execution. The code I am testing:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Array
import numpy as np
import time


def ProcessData(sub_data1, data2, freq):
    for dat1 in sub_data1:
        for dat2 in data2:
            d = int( np.sqrt( (dat1 - dat2)**2 ) )
            #d = int(dat1 - dat2)
            if (d < len(freq)):
                freq[d] += 1

def SplitList(data, n):
    sub_len = divmod(len(data),n)[0]
    print(sub_len)
    slices = []
    for i in range(n):
        slices.append( data[i*sub_len:i*sub_len+sub_len] )
    return slices
    
def main(nproc):
    print("Number of cpu : ", mp.cpu_count())
    lock = Lock()
    N = 30
    chip = [1,1,1,1,1,2,2,2,2,2,3,3,3,3,4,4,4,4,4,5,5,5,5,5,6,6,6,6,6,7,7,7,7,7,8,8,8,8,8,9,9,9,9]
    data1 = np.array( chip * N )
    data2 = np.array( chip * N )
    freq = Array('i', 100, lock=lock)
    dat1_subs = SplitList(data1,nproc)
    print('Number of data1 slices {:d}'.format(len(dat1_subs)))
    t_start = time.time()
    
    if __name__ == '__main__':
        for i in range(0, nproc):
            print('LEN {:d}: {:d}'.format(i, len(dat1_subs[i] )) )
            p = Process(target=ProcessData, args=(dat1_subs[i], data2, freq))
            p.start()
            p.join()

    t_end =  time.time()
    print('Total time (s)= ' + str(t_end - t_start))
    print(str(list(freq)))
    #new_array = np.frombuffer(freq.get_obj())
    Sum = sum( list(freq) )
    print('Total {:d}'.format(Sum))
    
NProc = 4

main(NProc)

I would appreciate any input or hints what I'm doing wrong. Or maybe there more simpler approaches I just don't know. Thanks.

Art
  • 11
  • 3
  • 2
    There is no parallelism: you start a process, then wait for it finish before you start another process. More typical: build a list of the `Process` objects, _then_ loop over that list to `.start()` all of them, _then_ loop over that list again to `.join()` them. – Tim Peters Aug 14 '20 at 21:13
  • What could be the reason that the shared array ('freq' which accumulates counts) has slightly different counts in some binds once processed with different number of cpus? – Art Aug 17 '20 at 14:34

2 Answers2

1

Try like this

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors inmething like this
                                    executor.map(factorize_naive, nums))}
abdulsaboor
  • 678
  • 5
  • 10
0

From your comment:

What could be the reason that the shared array ('freq' which accumulates counts) has slightly different counts in some binds once processed with different number of cpus?

I'd be amazed if you didn't get different results from run to run even if you didn't change the number of CPUs. If you do change them, then the "obvious" first reason is that your SplitLists() can throw away a varying amount of trailing data depending on the n passed to it.

But even if you don't change the number of processes,

freq[d] += 1

is not deterministic. Yes, the Array type is synchronized across processes, but that only applies to loads and stores on their own. That operation is compound, and under the covers works like

with lock:
    temp = freq[d]
temp += 1
with lock:
    freq[d] = temp

There's nothing to stop multiple processes from reading the same current freq[d] value, each adding 1 to that on its own, then storing the same new value multiple times.

To make the increment-in-place atomic as a whole, you would need to pass a different lock (in which case you could just as well use a RawArray), and do:

with lock:
    freq[d] += 1

But then extremely high contention for that lock will kill performance. Provided there aren't "too many" possible d values, it would be far better (for speed) to pass an array of len(freq) distinct lock objects, and do:

with locks[d]:
    freq[d] += 1

Then lock contention happens only when multiple processes happen to be changing the same d count.

But if len(freq) isn't all that large, I'd avoid shared memory entirely. Let each process run full speed with its own freq list, and let the main program sum them up. Here's an example, but it's entirely different than the code you posted. No numpy (it's irrelevant to the issues here), and none of the frankly weird stuff you inherited from the post you linked to:

def work(raw, freqlen):
    freq = [0] * freqlen
    for x in raw:
        if x < freqlen:
            freq[x] += 1
    return freq

def main(nproc, nfreq, numperchunk=100000):
    import multiprocessing as mp
    base = list(range(200)) * 1000000
    with mp.Pool(processes=nproc) as pool:
        i = 0
        ps = []
        while i < len(base):
            ps.append(pool.apply_async(work,
                                       (base[i : i + numperchunk],
                                        nfreq)))
            i += numperchunk
        result = [0] * nfreq
        for p in ps:
            for i, x in enumerate(p.get()):
                result[i] += x
        print(result)

if __name__ == "__main__":
    main(4, 10)

Where shared memory may actually help: chopping up a giant vector and passing slices to worker processes is expensive (large amount of interprocess communication). It could be better to put the read-only giant vector in shared memory, and just pass slice indices to workers. Or, on a Linux-y system, let workers inherit the giant vector(s) at module level via copy-on-write fork() semantics.

Tradeoffs

To make some of those ideas concrete, here's a variation more like your original. But:

  • It puts the "giant vector" in shared memory too.
  • Since the vector is shared, only slice bounds need to passed to the worker. Much less interprocess communication needed.
  • The freq vector is also shared, but the worker uses its own local version first, for peak lock-free speed. It only holds a lock at the end, to fold its local results into the shared result.
  • Because we're doing our own locking RawArrays are used instead. Access to those is much faster.

This screams. In fact, the program spends substantially more time creating the test case than calculating the histogram ;-)

def work(base, lo, hi, freq, L):
    freqlen = len(freq)
    myfreq = [0] * freqlen
    for i in range(lo, min(hi, len(base))):
        x = base[i]
        if x < freqlen:
            myfreq[x] += 1
    with L:
        for i, x in enumerate(myfreq):
            freq[i] += x

def main(nproc, nfreq):
    import multiprocessing as mp
    import math
    base = mp.RawArray('h', list(range(201)) * 1000003)
    freq = mp.RawArray('i', nfreq)
    L = mp.Lock()
    numperchunk = math.ceil(len(base) / nproc)
    print(f"{len(base)=:,} {numperchunk=:,}")
    ps = []
    a = 0
    for i in range(nproc):
        p = mp.Process(target=work,
                       args=(base, a, a + numperchunk, freq, L))
        p.start()
        a += numperchunk
        ps.append(p)
    for p in ps:
        p.join()
    print(list(freq))

if __name__ == "__main__":
    main(4, 10)
Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • I genuinely appreciate this in-depth answer - I got something to learn. Regarding the 'screaming': in my case a serial execution takes many hours if not a day sometimes. So I deeply hope it will work as wished :) – Art Aug 18 '20 at 13:23
  • What is the reason using 'h' type in defining the 'base', instead of 'i' ? – Art Aug 18 '20 at 15:21
  • I simply didn't need more than 2 bytes for the elements it holds. You could certainly make it 'i` instead - and burn twice the shared memory needed to hold it. – Tim Peters Aug 18 '20 at 15:29