1

I'm running Python 3.9.1

Note: I know there are questions with similar titles. But those questions are embedded in complicated code that makes it hard to understand the problem. This is a bare-bones implementation of the problem which I think others will find easier to digest.

EDIT: I have Pool(processes=64) in my code. But most other will probably have to change this according to how many cores there are on their computer. And if it takes too long, change listLen to a smaller number

I'm trying to learn about multiprocessing in order to solve a problem at work. I have a list of arrays with which I need to do a pairwise comparison of the arrays. But for simplicity, I've recreated the gist of the problem with simple integers instead of arrays and an addition function instead of a call to some complicated comparison function. With the code below, I'm running into the titular error

import time
from multiprocessing import Pool
import itertools
import random

def add_nums(a, b):
    return(a + b)

if __name__ == "__main__":
    listLen = 1000
    
    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [*itertools.combinations(range(len(myList)),2)]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

user3666197
  • 1
  • 6
  • 50
  • 92
The_Questioner
  • 240
  • 2
  • 7
  • 17

3 Answers3

1

Python cannot pickle lambda functions. Instead you should define the function and pass the function name instead. Here is how you may approach this:

import itertools
import random
import time
from multiprocessing import Pool


def add_nums(a, b):
    return a + b


def foo(x):
    return add_nums(x[0], x[1])


if __name__ == "__main__":
    listLen = 1000

    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [
        (myList[i[0]], myList[i[1]])
        for i in itertools.combinations(range(len(myList)), 2)
    ]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(foo, index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(foo, index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

I modified index_combns to also extract the value from myList in place, because myList will not be accessible from foo and passing in multiple copies of myList will increase space complexity of your script.

Running this prints:

Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP
Aryan Jain
  • 384
  • 1
  • 7
  • Thank you for your response. Though this will work for this simplified version of the problem, in reality, the elements of `myList` aren't integers, they are large arrays, so creating pairs of `myList[i[0]], myList[i[1]]` for all combinations of the indices of the list will store hundreds of copies of those large arrays unnecessarily. So I can't create `index_combns` in this way. I hope I'm able to get my problem across. – The_Questioner Mar 21 '22 at 19:00
  • With due respect, you might want to correct your claim that *(cit.) **"Python cannot pickle..."*** - it is not a correct statement. Python cannot pickle lambdas, if using pickle-module. Python can pickle lambdas when using dill-module ( for about 10 years already ). – user3666197 Mar 22 '22 at 00:19
1

I'm not exactly sure why (though a thorough read through the multiprocessing docs would probably have an answer), but there's a pickling process involved in python's multiprocessing where child processes are passed certain things. While I would have expected the lambdas to be inherited and not passed via pickle-ing, I guess that's not what's happening.

Following the discussion in the comments, consider something like this approach:

import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory

def add_mats(a, b):
    #time.sleep(0.00001)
    return (a + b)

# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
    shm = shared_memory.SharedMemory(name=shm_name)
    stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
    a = stacked[i1]
    b = stacked[i2]
    result = add_mats(a, b)
    shm.close()
    return result

if __name__ == "__main__":
    class Timer:
        def __init__(self):
            self.start = None
            self.stop  = None
            self.delta = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *exc_args):
            self.stop = time.time()
            self.delta = self.stop - self.start

    arrays = [np.random.rand(5,5) for _ in range(50)]
    index_combns = list(itertools.combinations(range(len(arrays)),2))

    # Helper for non-mp version
    def add_mats_pair(ij_pair):
        i, j = ij_pair
        a = arrays[i]
        b = arrays[j]
        return add_mats(a, b)

    with Timer() as t:
        # Do the pairwise operation without multiprocessing
        sums_no_mp = list(map(add_mats_pair, index_combns))

    print(f"Process took {t.delta} seconds with no MP")


    with Timer() as t:
        # Stack arrays and copy result into shared memory
        stacked = np.stack(arrays)
        shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
        shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
        shm_arr[:] = stacked[:]

        with Pool(processes=32) as pool:
            processes = [pool.apply_async(add_mats_shared, (
                shm.name,
                stacked.shape,
                stacked.dtype,
                i,
                j,
            )) for (i,j) in index_combns]
            sums_mp = [p.get() for p in processes]

        shm.close()
        shm.unlink()

    print(f"Process took {t.delta} seconds with MP")

    for i in range(len(sums_no_mp)):
        assert (sums_no_mp[i] == sums_mp[i]).all()

    print("Results match.")

It uses multiprocessing.shared_memory to share a single numpy (N+1)-dimensional array (instead of a list of N-dimensional arrays) between the host process and child processes.

Other things that are different but don't matter:

  • Pool is used as a context manager to prevent having to explicitly close and join it.
  • Timer is a simply context manager to time blocks of code.
  • Some of the numbers have been adjusted randomly
  • pool.map replaced with calls to pool.apply_async

pool.map would be fine too, but you'd want to build the argument list before the .map call and unpack it in the worker function, e.g.:

with Pool(processes=32) as pool:
    args = [(
        shm.name,
        stacked.shape,
        stacked.dtype,
        i,
        j,
    ) for (i,j) in index_combns]
    sums_mp = pool.map(add_mats_shared, args)

# and 

# Helper for mp version
def add_mats_shared(args):
    shm_name, array_shape, array_dtype, i1, i2 = args
    shm = shared_memory.SharedMemory(name=shm_name)
    ....
jedwards
  • 29,432
  • 3
  • 65
  • 92
  • Thanks for the help. Though I'm trying to call the `add_nums` function on the elements of `myList`, not the indices, Should I pass the entirety of `myList` into the function with each call? Would that cause any performance issues? – The_Questioner Mar 21 '22 at 18:55
  • @The_Questioner Oh you could just call combinations on myList directly, instead of the indices. I'll update. – jedwards Mar 21 '22 at 18:58
  • 1
    Thanks again. Unfortunately, this isn't feasible for my real problem (though it'll work for this simplified version) because in my actual problem instead of simple integers, `myList` is a list of large arrays. So creating combinations like this will created thousands of unnecessary copies of those large arrays. – The_Questioner Mar 21 '22 at 19:04
  • Fair enough, editing... – jedwards Mar 21 '22 at 19:07
  • Hmmmm, I haven't been able to figure it out. I was wondering if you were able to get it to work without having to pass in the entirety of `myList` over and over again. – The_Questioner Mar 21 '22 at 19:34
  • Sorry, I didn't ever update it. I think you're going to need something like a [Manager](https://docs.python.org/3/library/multiprocessing.html#managers) or [shared memory](https://docs.python.org/3/library/multiprocessing.shared_memory.html). In order to provide `myList` to child processes efficiently. `myList` is a list of numpy arrays? – jedwards Mar 21 '22 at 20:12
  • Yes, they are numpy arrays of length ~70,000 Sorry for the late reply – The_Questioner Mar 21 '22 at 20:43
  • @The_Questioner updated with a shared_memory approach – jedwards Mar 21 '22 at 21:11
  • Thank you! I'm reading up on shared memory but this seems to be what I needed to get around my problem. For some reason the MP version is much slower than the non-MP, but I'll try to figure that out. My major hurdle was the pickle error and you were a huge help with that. Thanks! – The_Questioner Mar 21 '22 at 21:49
  • @The_Questioner "For some reason the MP version is much slower than the non-MP, but I'll try to figure that out." Try uncommenting the `time.sleep` line inside the `add_mats` function (emulating a more expensive worker callback) and you should see the performance flip as you might have expected. – jedwards Mar 21 '22 at 21:58
  • @jedwards - your proposal ignores all add-on costs, that kill the performance once scales grow larger ( a common phenomenon here ) - (a) list-operations are "nice" in school-book alike SLOC-s , yet killers on large RAM-footprints ( as is the case here ), the worse if inside "nice" iterators (b) Python Interpreter add-n costs for process-instantiation is immense blocker - look at Amdahl's Law impact, if we start not to ignore added costs ( instructions never present in a pure-[SERIAL] baseline run ) https://stackoverflow.com/revisions/18374629/3 (c) last but not least, processed block numpy ... – user3666197 Mar 21 '22 at 22:07
  • @user3666197 I don't really have a proposal. OP presented an example problem where they wanted to share a large set of arrays without copying each multiple times and this solves that. Because it's an example problem, I have no way to know whether MP is the right route or not, but I assume they'll be able to figure that out quite quickly once they get it working. In terms of (c), it's not clear what you mean. If you're talking about the GIL and threads, these are processes. If you're talking about numpy libs like MKL or BLAS, that's sort of outside the scope of this question, imo. – jedwards Mar 21 '22 at 22:17
  • Agree that the O/P has declared a will to learn. Yet lambda-pickling is a marginal blocker in this, sharing is performance killer inside both Python Interpreter ecosystem ( thread-based, due to GIL-lock ) and exosystem ( process-based, due to performance killing add-on costs of such "sharing"-emulating process-to-process SER/xfer/DES pipeline overheads, the more blocking the independence, the processes were spawned for getting, wasn't they? ) in (c) a typo /sorry/ the argument was in that the more processes occupy CPU-cores, the less cores remain free for native, HPC-grade numpy-multicore runs – user3666197 Mar 22 '22 at 00:10
  • IMHO colliding anti-patterns are not out of scope here, as a resulting performance will only suffer. Having an explicit O/P statement, that the code uses **large**-list of **large**-numpy-arrays is a flashing red-light warning this will never work fast in serial-iterator stepped P2P-distribution of pair-wise list-members calculus. It's possible not to tell this to the O/P, yet in such case we did not do a fair advice here, did we? I do not try to plead for going into nano-scale disputations about MKL v/s BLAS (dis)advantages, yet leaving the O/P inside a mammoth-hole of extending anti-pattern? – user3666197 Mar 22 '22 at 00:16
1

Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "

A :
the single most important piece of experience to learn
is how BIG are the COSTS-of-( process )-INSTANTIATION(s),
all other add-on overhead costs
( still by no means not negligible, the more in growing the scales of the problem )
are details in comparison to this immense & principal one.

Before the answer is read-through and completely understood, here is a Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story about a pool of sharks is a good place to start from )

Live Simulatordescription here

Inventory of problems :

a ) pickling lambdas ( and many other SER/DES blockers )

is easy - it is enough to conda install dill and import dill as pickle as dill can, for years pickle them - credit to @MikeMcKearns and your code does not need to refactor the use of the plain pickle.dumps()-call interface. So using pathos.multiprocess defaults to use dill internally, and this, for years known multiprocessing SER/DES weakness gets avoided.

b ) performance killers

- multiprocessing.Pool.map() is rather an End-to-End performance anti-pattern here - The Costs..., if we start not to neglect them, show, how much CPU-clocks & blocked physical-RAM-I/O transfers are paid for so many process-instantiations ( 60+ ), which finally "occupy" almost all physical CPU-cores, yet leaving thus almost zero-space for indeed high performance numpy-native multicore-computing of the core-problem ( for which the ultimate performance was expected to be boosted up, wasn't it? )

- just move the p-slider in the simulator to anything less than 100% ( having no [SERIAL]-part of the problem execution, which is nice in theory, yet never doable in practice, even the program launch is a pure-[SERIAL], by design )

- just move the Overhead-slider in the simulator to anything above a plain zero ( expressing a relative add-on cost of spawning a one of NCPUcores processes, as a number of percent, relative to the such [PARALLEL]-section part number of instructions - mathematically "dense" work has many such "useful"-instructions and may, supposing no other performance killers jump out of the box, may spends some reasonable amount of "add-on" costs, to spawn some amount of concurrent- or parallel-operations ( the actual number depends only on actual economy of costs, not on how many CPU-cores are present, the less on our "wishes" or scholastic or even worse copy/paste-"advice" ). On the contrary, mathematically "shallow" work has almost always "speedups" << 1 ( immense slow-downs ), as there is almost no chance to justify the known add-on costs ( paid on process-instantiations, data SER/xfer/DES moving in (params) and back (results) )

- next move the Overhead-slider in the simulator to the righmost edge == 1. This shows the case, when the actual process-spawning overhead-( a time lost )-costs are still not more than a just <= 1 % of all the computing-related instructions next, that are going to be performed for the "useful"-part of the work, that will be computed inside the such spawned process-instance. So even such 1:100 proportion factor ( doing 100x more "useful"-work than the lost CPU-time, burnt for arranging that many copies and making O/S-scheduler orchestrate concurrent execution thereof inside the available system Virtual-Memory ) has already all the warnings visible in the graph of the progression of Speedup-degradation - just play a bit with the Overhead-slider in the simulator, before touching the others...

- avoid a sin of "sharing" ( if performance is the goal ) - again, costs of operating such orchestration among several Python Interpreter processes, now independent, takes additional add-on costs, never justified in gaining performance boosted, as the fight for occupying shared resources ( CPU-cores, physical-RAM-I/O channels ) only devastates CPU-core-cache re-use hit-rates, O/S-scheduler operated process context-switches and all this further downgrades resulting End-to-End performance (which is something we do not want, do we?)

c ) boosting performance

- respect facts about the actual costs of any kind of computing operation
- avoid "shallow"-computing steps,
- maximise what gets so expensively into a set of distributed-processes (if a need remains so),
- avoid all overhead-adding operations (like adding local temporary variables, where inline operations permit to inplace store of partial results)
and
- try go into using the ultra-performant, cache-line friendly & optimised, native numpy-vectorised multicore & striding-tricks capabilities, not blocked by pre-overloaded CPU-cores by scheduling so many (~60) Python Interpreter process copies, each one trying to call numpy-code, thus not having any free cores to actually place such high-performance, cache-reuse-friendly vectorised computing onto (there we get-or-loose most of the performance, not in slow-running serial-iterators, not in spawning 60+ process-based full-copies of "__main__" Python Interpreter, before doing a single piece of the useful work on our great data, expensively RAM-allocated and physically copied 60+ times therein)

- refactoring of the real problem shall never go against a collected knowledge about performance as repeating the things that do not work will not bring any advantage, will it?
- respect your physical platform constraints, ignoring them will degrade your performance
- benchmark, profile, refactor
- benchmark, profile, refactor
- benchmark, profile, refactor
no other magic wand available here

and once already working on the bleeding edge of performance, set gc.disable() before you spawn the Python Interpreter into N-many replicated copies, not to wait for spontaneous garbage-collections when going for ultimate performance

halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Thank you for the response. Admittedly, I don't quite understand everything that's written here. But I will take some time to go through it and I'm sure it will be a huge help once I do :) – The_Questioner Mar 22 '22 at 17:46
  • ***Always welcome*** @The_Questioner - your mathematical imagination and the knowledge of the actual Physics, how our computing toys work inside, will help you follow the logic behind both the original and the add-on overhead costs aware *"Law of diminishing returns"* – user3666197 Mar 22 '22 at 20:40