0

EDIT: My goal with this post is to understand the source of the time-drain. I welcome other recommendations as well, but my main concern, and I want to learn about is why my code is not speeding up with parallelization? What is it that's causing the parallelization to slow down?

I previously asked a question about this, and I now realize that it wasn't a good one. I apologize for the poor post. So I'm re-asking having put more effort into solving it.

I have managed to implement a parallelized solution. However, the parallelized code is much much much slower than the serialized version.

EDIT: The foo() function below is rather simple, and can be put more concisely, but the real version of the function is a bit more complicated. The main problem is still the fact that at thousands of arrays, each of length ~70,000, the sheer number of comparisons is what's causing the slowness. So parallelization seems to be the best solution here. Of course, recommendations for making the steps more efficient are welcome, and I appreciate any such suggestions.

Problem

Consider a list of numpy arrays. I need to do pairwise comparisons on these arrays in the list. My real problem has thousands of arrays of length ~70,000, but the toy example below has much smaller numbers (can be adjusted with the listLen and arrayLen variables though)

Attempt

Here foo() is the comparison function that will be used. If you try playing around with arrayLen and listLen, you'll see that no matter what values you choose, the parallelized function do_calculations_mp is always slower than the non-parallelized version do_calculations_no_mp. From what I've read, multiprocesing.Process has less overhead than multiprocess.Pool, so it shouldn't be taking this long, right?

I'd really appreciate any help on this.

import numpy as np
from multiprocessing import Process
import itertools
import random
import sys
from datetime import datetime

def foo(arr1, arr2):
    matches = 0
    for i in range(len(arr1)):
        if arr1[i] == arr2[i]:
            matches += 1
    return(matches)


def do_calculations_mp(aList):
    flag_indices = []

    processes = []

    index_combns = list(itertools.combinations(range(len(aList)),2))
    for i,j in index_combns:
        p = Process(target = foo, args = (aList[i], aList[j]))
        processes.append(p)
        p.start()

    for procs in processes:
        procs.join()

    return(flag_indices)


def do_calculations_no_mp(aList):
    flag_indices = []
    index_combns = list(itertools.combinations(range(len(aList)),2))
    for i,j in index_combns:
        numMatches = foo(aList[i], aList[j])

    return(flag_indices)

if __name__ == '__main__':
    listLen = 50
    arrayLen = 300
    # Creates a list of listLen arrays, where each array has length arrayLen
    myList = [np.array([random.choice([0,1,2,5]) for i in range(arrayLen)]) for x in range(listLen)]

    print("Processing No MP:             " + str(datetime.now()))
    flagged = do_calculations_no_mp(myList)
    print("Done processing No MP:        " + str(datetime.now()))

    print("Processing MP:                " + str(datetime.now()))
    flagged_mp = do_calculations_mp(myList)
    print("Done processing MP:           " + str(datetime.now()))
The_Questioner
  • 240
  • 2
  • 7
  • 17
  • 1
    Multiprocessing is nice but you can speed up calculations by using numpy. Your whole `foo` function is equivalent to `np.where(np.equal(arr1, arr2))[0].size`. – Guimoute Mar 22 '22 at 21:26
  • I'll edit my post and mention this. But the `foo` function is simplified in this example as well. The real function is a bit more complicated than this and I don't think it can be simplified so concisely as this example version can. – The_Questioner Mar 22 '22 at 21:28
  • 3
    With multiprocessing, there's a whole bunch of overhead in serializing objects to send to the subprocesses. That may be what you're seeing here. – AKX Mar 22 '22 at 21:35
  • That makes sense. I referenced this near the end of my post because I was expecting it to be an issue. But I have no clue whatsoever how to reduce that overhead, or program cleverly around it. Admittedly, it's been a bit difficult to wrap my head around what's happening behind the scenes of this `multiprocessing` library – The_Questioner Mar 22 '22 at 21:40
  • It's not the biggest source of problems but you should remove the call to `list` in `list(itertools.combinations(...))`. It's a generator. With a list of 5000 arrays containing 1 element each, the call to `list` takes 1.5s on my machine. It also needlessly takes memory. – Guimoute Mar 22 '22 at 21:47
  • Thank you for that suggestion. I will get rid of that. – The_Questioner Mar 22 '22 at 21:56
  • 1
    Dear all - almost all pieces of your advice were already present in the first question and the second ( now deleted ) question about the same subject. Best to remind the StackOverflow Site policy not to repeat the same or very similar question to attract more attention, once the subject has already been answered ( details below ) - this is not a fair practice in Knowledge sponsored Community, is it? – user3666197 Mar 22 '22 at 22:07
  • @user3666197 I'm sorry about this. I realize you helped with the previous question, but unfortunately, the explanation was quite difficult for me to comprehend. Frankly, so is your current answer. I really appreciate your help, but I am hoping that I could get an explanation that's easier to digest. You put a lot of effort into your response, but I'm too inexperience in the subject to properly understand it. – The_Questioner Mar 22 '22 at 22:11
  • What is the source of your lists? Could them be accessed/generated by the subprocess instead of pass the list as a parameter? That would remove the serialization / deserialization overhead – Gonzalo Odiard Mar 22 '22 at 22:15
  • @GonzaloOdiard unfortunately the list can't be generated. It's being read into from a file. – The_Questioner Mar 22 '22 at 22:16
  • There are 1225 items in ``index_combns`` and you start a new Process *for each item*. That's 1225 processes battling for the few cores your machine physically has! All of that for a payload that only takes 0.06s, which by itself is already close to the runtime overhead of individual processes... – MisterMiyagi Mar 22 '22 at 22:45
  • Thinking the same. maybe yu could try using a process pool? (with the number of cores of your computer, by example) https://docs.python.org/3/library/multiprocessing.html – Gonzalo Odiard Mar 22 '22 at 22:49
  • @GonzaloOdiard While a pool will be less wasteful, it'll still be more wasteful than just running the thing sequentially. There's just no point parallelising such a trivial task. – MisterMiyagi Mar 22 '22 at 23:08
  • 1
    @MisterMiyagi Thanks for the help. The issue is that running sequentially takes almost a day with the size of the input that we get. We have a list of 6000 arrays. This amounts to 18M pairwise comparisons. I have 64 cores available. Is it possible to explicitly assign those 18M comparisons to the 64 cores so that each core handles 18M/64 operations? i.e. Instead of having Python try to assign the comparisons to the cores, I can have 64 instances of the for loop, each handling its own chunk of the work. I hope that makes sense – The_Questioner Mar 22 '22 at 23:14
  • Most likely ``multiprocessing.Pool`` or ``concurrent.futures.ProcessPool`` (different interfaces to basically the same thing) is appropriate, but it is impossible to tell for sure with the vague descriptions. There are some rough notions to follow – maximizie work per process, minimize communication between processes – yet the relevant information is stripped from your description. This kind of explorative question might be more suitable for chat. – MisterMiyagi Mar 22 '22 at 23:19
  • @MisterMiyagi If you don't mind, that'd be great. Once I have a solution, I'll update the post with the solution for others to benefit from. Thank you – The_Questioner Mar 22 '22 at 23:22
  • @MisterMiyagi with all respect, no - your advice is wrong ( as was detailed to O/P yesterday in https://stackoverflow.com/a/71565224 ) Today the 2nd ( now deleted ) question from O/P was stating other counts / sized - being some thousands about 0.7M long numpy-arrays, here, suddenly 6000 ( producing about 18M pairwise combinations, HALF of which was already computed due to symmetry ). More and more principal issues come from discussion creeping like this. Process-based concurrency is awfully wrong here, due to Amdahl's Law + O( N^q ); q > 1 costs of moving tens of [MB] objects via SER/DES! – user3666197 Mar 22 '22 at 23:31
  • @user3666197 Before symmetry there were 36M million comparisons. The 18M comes from the symmetry you correctly identified. – The_Questioner Mar 22 '22 at 23:34
  • @user3666197 You are guessing here just as well. OP obviously has some *serious* misconceptions about (process based) parallelism, so I would take any number and constraint with a grain of salt. There is nothing to be gained here by insisting on what's right or wrong. Please turn down the volume. – MisterMiyagi Mar 22 '22 at 23:37
  • Using vectorized Numpy operations to solve this problem should be much more efficient (possibly combined with Numba to generate the combination more efficiently). Using multiprocessing with list-based computation for numerical/combinatoric program is often a wast of time since it makes the code more complex, less flexible, does not run well in the interpreter and is barely able to mitigate the huge overhead of the CPython interpreter. – Jérôme Richard Mar 24 '22 at 20:24
  • 1
    @JérômeRichard - you might have noticed, that this has been advised to O/P -3- times so far, since first posted with the only effect -- 3x downvoting an answer below, where these facts were both presented & equipped with tooling to improve the processing performance ( harnessing multicore CPUs more efficiently, than by just an escape-from-GIL-lock into multiprocessing, plus avoiding all classes of overhead costs ). Last but not least, the cost of combinatorics on ~6k set is a few minutes, whereas other costs & inefficiencies cause the problem to efficiently block the computer for ~ 24 hours – user3666197 Mar 25 '22 at 16:54

2 Answers2

0

A few general points:

  1. You have a finite number of processors. Your code is creating a number of processes equal to the number of combinations that you have, which might (greatly) exceed the number of processors and that is rather pointless.
  2. You cannot return results from a process with your return statement. You need another mechanism for doing so, such as passing a multiprocessing.Queue instance to your processes to which results are written (there are other ways).
  3. Using a multiprocessing pool solves issues 1. and 2.
  4. Keeping the arrays in shared memory will reduce the overhead of multiprocessing.
  5. The more CPU-intensive foo is, the better multiprocessing will perform compared to serial processing. This means having long lists.

I have modified your demo incorporating these ideas to demonstrate that multiprocessing can outperform serial processing. Note that foo now accesses myList as a global variable, which is a list of shared memory arrays, and so it now only has to be passed list indices.

from multiprocessing import Pool, Array
import itertools
import random
import sys
from time import time_ns


listLen = 6
arrayLen = 5_000_000

def foo(i, j):
    arr1 = myList[i]
    arr2 = myList[j]
    matches = 0
    for index in range(arrayLen):
        if arr1[index] == arr2[index]:
            matches += 1
    return(matches)

def generate_args():
    index_combns = list(itertools.combinations(range(listLen), 2))
    for i, j in index_combns:
        yield (i, j)

def init_pool_processes(aList):
    global myList
    myList = aList

def do_calculations_mp():
    numMatches = 0
    with Pool(initializer=init_pool_processes, initargs=(myList,)) as pool:
        for matches in pool.starmap(foo, generate_args()):
            numMatches += matches
        pool.close()
        pool.join()
    return numMatches

def do_calculations_no_mp():
    numMatches = 0
    for i, j in generate_args():
        numMatches += foo(i, j)
    return numMatches

if __name__ == '__main__':
    # Creates a list of listLen arrays, where each array has length arrayLen
    myList = [Array('i', [random.choice([0,1,2,5]) for i in range(arrayLen)], lock=False) for x in range(listLen)]

    start = time_ns()
    numMatches = do_calculations_no_mp()
    elapsed = (time_ns() - start) / 1_000_000_000
    print(f"Done processing No MP, numMatches = {numMatches}, elapsed time = {elapsed}")

    start = time_ns()
    numMatches = do_calculations_mp()
    elapsed = (time_ns() - start) / 1_000_000_000
    print(f"Done processing MP, numMatches = {numMatches}, elapsed time = {elapsed}")

Prints:

Done processing No MP, numMatches = 18748321, elapsed time = 10.0353228
Done processing MP, numMatches = 18748321, elapsed time = 2.7033887

With listLen defined as 70_000, however, which is approximately the length of your lists, we get:

Done processing No MP, numMatches = 261721, elapsed time = 0.1350159
Done processing MP, numMatches = 261721, elapsed time = 0.2939995

So the issue is that until your lists become longer, foo at the current complexity, is not sufficiently CPU-intensive to be more performant with multiprocessing. Of course, if your actual foo is far more complex, then ...

As the number of combinations become very large, you might want to consider using method multiprocessing.pool.imap_unordered with a suitable chunksize argument. foo would then need to be redefined to take a single tuple argument.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Let me claim your item 5. is not correct. Given the np.array-processing is ( unless forcefully programmed as a serial-iterator stepped anti-pattern ) bypassing the GIL-lock and uses multicore capabilities of the numpy, enjoying decades of HPC expertise to organise matrix/vector calculations to maximise cache-reuse and other performance tricks. Going a multiprocessing way (against numpy-native multicore) is extremely hard, as Amdahl's Law (incl. add-on overhead costs,being many orders of magnitude higher in multiprocessing.Pool.starmap(), the worse in other task-farming methods) as the Law sets – user3666197 Mar 23 '22 at 14:16
  • Sharing is another performance anti-pattern, that adds even more overhead add-on costs ( that are obviously not present in a pure-[SERIAL] original code, now adding 70k+ times the overheads of SER/xfer/DES-pipeline processing used for process-to-process communications per-task ) – user3666197 Mar 23 '22 at 14:18
  • 1
    When you are doing multiprocessing, which is your ultimate goal, then the cost of passing data between address spaces becomes an issue. Now for list lengths of 70_000, which we call "modest sized", this cost may not be an issue. But, for what it's worth (nothing to you, probably), with the original demo's list length of 5_000_000. when I make the simple change of `myList = [np.array([random.choice([0,1,2,5]) for i in range(arrayLen)]) for x in range(listLen)]`, i.e. reverting back to the `numpy` array, then the serial and multiprocessing times go up to 22.8149997 and 9.6639336 respectively. – Booboo Mar 23 '22 at 14:42
  • You might already noticed, that a) *"doing multiprocessing"* is not my *"ultimate goal"* b) benchmarking a side-effect of a sin of premature optimisation as shown when trying to argument, based but on artifacts that do not represent the real problem - here, b.1) artificial use a GIL-blocked,shared,global list-based storage,be it 6k (posted by O/P yesterday),70k today or otherwise b.2) skewing actual overhead costs of moving a global "through" initialiser or b.3) forcing natively-multicore numpy into knowingly slow,GIL-blocked serial-iterator list-comprehensions+later re-represent it to numpy? – user3666197 Mar 23 '22 at 15:46
-3

If performance matters, let's start from principles

Python Interpreter is interpreting instructions, use dis.dis( foo ) to see them below.

Python Interpreter is expensive if going to spawn processes (as detailed in your first question)

Python Interpreter is known to be awfully bad (read slow) if using serial-iterators (read for-loops, generator-based task-farming et al)

Python Interpreter in distributing "lightweight" tasks for scales of about 18M tasks, combined from data from 6E3 list-of-items, each item being about 1E6 long np.array-instance - as you described in your (now deleted) second question, will almost for sure kill its own capabilities (performance-wise), if all above are used at once inside multiprocessing.

def foo(arr1, arr2):
    matches = 0
    for i in range(len(arr1)):
        if arr1[i] == arr2[i]:
           matches += 1
return(matches)

was demonstrated as the MCVE-representation of the work-unit ( a task )

>>> dis.dis( foo )
  2           0 LOAD_CONST               1 (0)
              3 STORE_FAST               2 (matches)

  3           6 SETUP_LOOP              59 (to 68)
              9 LOAD_GLOBAL              0 (range)
             12 LOAD_GLOBAL              1 (len)
             15 LOAD_FAST                0 (arr1)
             18 CALL_FUNCTION            1
             21 CALL_FUNCTION            1
             24 GET_ITER            
        >>   25 FOR_ITER                39 (to 67)
             28 STORE_FAST               3 (i)

  4          31 LOAD_FAST                0 (arr1)
             34 LOAD_FAST                3 (i)
             37 BINARY_SUBSCR       
             38 LOAD_FAST                1 (arr2)
             41 LOAD_FAST                3 (i)
             44 BINARY_SUBSCR       
             45 COMPARE_OP               2 (==)
             48 POP_JUMP_IF_FALSE       25

  5          51 LOAD_FAST                2 (matches)
             54 LOAD_CONST               2 (1)
             57 INPLACE_ADD         
             58 STORE_FAST               2 (matches)
             61 JUMP_ABSOLUTE           25
             64 JUMP_ABSOLUTE           25
        >>   67 POP_BLOCK           

  6     >>   68 LOAD_FAST                2 (matches)
             71 RETURN_VALUE        

Using the amount of pseudo-instructions as a simplified measure of how much work has to be done ( where all of CPU-clocks + RAM-allocation-costs + RAM-I/O + O/S-system-management time spent must count and counts the actually accrued costs ), we start to see the relative-costs of all these ( unavoidable ) add-on costs, compared to the actual useful task ( i.e. how many pseudo-instructions are finally spent on what we want to compute --The-useful-WORK--, contrasting the amounts of already burnt overhead-costs and per-task add-on costs, that were needed to make this happen and run )

This fraction is cardinal if fighting for both performance & efficiency (of resources usage patterns).

For cases, where add-on overhead costs dominate, these cases are straight a sin of keeping anti-patterns stacked to their worst consequences

For cases, where instantiation + per-task add-on overhead costs make less than 0.01 % of the useful work, we still may result in pretty unsatisfactory low speedups (see the simulator and related details, as posted yesterday and see the devastated speedup line - i.e. from a day-long runtimes today, you will receive about a day-long runtimes, if keeping all these add-on costs as high as now )

For cases, where the scope of the useful work ( the actual foo(), yet performance-boosted to strip-off most of its performance antipatterns ) strongly dominates, i.e. diminishes all add-on + per-task overhead costs, there we still see the Amdahl's Law ceiling - called so, so self-explanatory - a "Law of diminishing returns" ( since adding more resources ceases to improve the performance, even if we add infinitely many CPU-cores and likes )

Let's test the foo()-speeds above ( as-was presented ) and below ( as-refactored ) :

from zmq import Stopwatch; aClk = Stopwatch()       # a [us]-resolution clock
import numpy as np

def test( aSize = 1E6 ):
    TheSIZE = int( min( 1E7, aSize ) )              # fused RAM-footprint < GB
    A = np.random.random_integers( 0, 9, TheSIZE )  # pre-allocate, fill
    B = np.random.random_integers( 0, 9, TheSIZE )  # pre-allocate, fill
    aClk.start() #----------------------------------# CRITICAL SECTION.start
    _ = np.where( A==B, 1, 0 ).sum()                #    smart compute "count"  
    t = aClk.stop() #-------------------------------# CRITICAL SECTION.end
    MASK = ( "test( aSize = {0:>20d} ) "
           + "took {1: >20d} [us] "
           + " {2:} matches found in random filled A, B "
           + " ~ {3:} [us/loop]"
             )
    print MASK.format( TheSIZE, t, _, float( t ) / TheSIZE )

use the same to measure how long does the foo( A, B ) last in your serial-iterator code

a "naked"-foo( A, B ) took about 800 [ns] (!!) per array-cell
( to that we mast add all SER/xfer/DES costs we pay for moving ~ 20[MB] of parameters' data, per task, which may take and costs hundreds of thousands of CPU-clocks, just to move the data into a remote process (worker), before it starts a few pseudo-instructions dis.dis( foo ) shown above, ouch, that hurts ... )
while
np.where()-code took under about 19 [ns] 42 X faster per array-cell ( 25 [ns] if all 4-cores on my localhost were background-loaded with desktop system running 3 TV-streams live, so less free CPU-cores to use numpy-multicore code tricks on-the-fly )

>>> test( 1E34 )
took 239712 [us]  999665 matches found in random filled A, B  ~ 0.0239712 [us/loop]
took 220671 [us] 1000246 matches found in random filled A, B  ~ 0.0220671 [us/loop]
took 227004 [us] 1001805 matches found in random filled A, B  ~ 0.0227004 [us/loop]
took 210587 [us]  999267 matches found in random filled A, B  ~ 0.0210587 [us/loop]
took 195863 [us]  998218 matches found in random filled A, B  ~ 0.0195863 [us/loop]
took 241407 [us] 1000017 matches found in random filled A, B  ~ 0.0241407 [us/loop]
took 193243 [us] 1001084 matches found in random filled A, B  ~ 0.0193243 [us/loop]

From the perspective of this comparison, start to try to properly refactor the process - the goal is to crawl all ~ 18M-pair-wise combinations from a 6E3-long list of ~10[MB]-large-np.arrays for all pairwise comparisons.

The faster the better, right?

user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    I realize that you simplified the `foo` function. But `foo` is an overly-simplified version of the actual comparison function. The real compariosn function is more complicated and can't simply be boiled down to a `np.where()` call. It'll work here, but it won't work in my actual code. With this post, what I hope to understand is how to deal with the overhead that I'm experiencing in my parallelized code. That is the main optimization that I'm interested in. – The_Questioner Mar 22 '22 at 22:20
  • No, @The_Questioner **I strongly protest** against the phrasing *(cit.) "you simplified the...function"*. The function does exactly what you have presented in the MCVE. The StackOverflow Site policy explains what M-inimum C-omplete V-erifiable E-xample of code, that both REPRESENTS and REPRODUCES the problem you try to solve. The code above shows 42 X faster solution to what was already yesterday advised to do - refactor the code to allow cache-friendly, multicore-efficient numpy-vectorisable matrix / vector operations on RAM-footprints above 5.6 [GB], using pairwise blocks above 10 [MB] each – user3666197 Mar 22 '22 at 23:15
  • The overheads are the killers. When ill-defining the O/P foo() process, kindly do not accuse us, the knowledge sponsors, that we do our best right on the code you have and keep entering into O/P, 2nd Question today and 3rd Question - all still saying nothing more than a serial-iterator driven performance anti-pattern. As (any costs) overheads need to be compared ( only being complete ) against benefits, no one can help you on code, that actually does not represent the target problem. Asking about other then actual problem is possible, yet yields only very mis-leading pieces of "advice" – user3666197 Mar 22 '22 at 23:20
  • I really do appreciate the effort you put into your answer. The `foo()` function is similar to what I put here, but has multiple if statements before it adds to the count. So it's representitive of the actual problem. I understand that speeding it up will speed up my code. But the heart of the problem is still that I have an O(n^2) program which will take a long time with large input. So my main goal is to *understand* parallelization. – The_Questioner Mar 22 '22 at 23:31