19

I would like concurrent.futures.ProcessPoolExecutor.map() to call a function consisting of 2 or more arguments. In the example below, I have resorted to using a lambda function and defining ref as an array of equal size to numberlist with an identical value.

1st Question: Is there a better way of doing this? In the case where the size of numberlist can be million to billion elements in size, hence ref size would have to follow numberlist, this approach unnecessarily takes up precious memory, which I would like to avoid. I did this because I read the map function will terminate its mapping until the shortest array end is reach.

import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3


def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')

Running the code above, I found that the map function was able to achieve my desired outcome. However, when I transferred the same terms to concurrent.futures.ProcessPoolExecutor.map(), python3.5 failed with this error:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed

Question 2: Why did this error occur and how do I get concurrent.futures.ProcessPoolExecutor.map() to call a function with more than 1 argument?

Sun Bear
  • 7,594
  • 11
  • 56
  • 102

3 Answers3

16

To answer your second question first, you are getting an exception because a lambda function like the one you're using is not picklable. Since Python uses the pickle protocol to serialize the data passed between the main process and the ProcessPoolExecutor's worker processes, this is a problem. It's not clear why you are using a lambda at all. The lambda you had takes two arguments, just like the original function. You could use _findmatch directly instead of the lambda and it should work.

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...

As for the first issue about passing the second, constant argument without creating a giant list, you could solve this in several ways. One approach might be to use itertools.repeat to create an iterable object that repeats the same value forever when iterated on.

But a better approach would probably be to write an extra function that passes the constant argument for you. (Perhaps this is why you were trying to use a lambda function?) It should work if the function you use is accessible at the module's top-level namespace:

def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...
Blckknght
  • 100,903
  • 11
  • 120
  • 169
  • 1
    You'r right, I resorted to experimenting `lambda` because I initially had problem passing a function with 2 arguments into the `executor` when `ref` was a constant. After converting `ref` to a list that is equal size to `numberlist`, I just realised I forgot to remove lambda. What I really wanted was a solution where `ref` is a constant or similar. So the helper function & `itertools.repeat` you mentioned worked.Thanks. – Sun Bear Feb 05 '17 at 22:17
  • I like to invite you to answer my [follow-up question](http://stackoverflow.com/q/42074501/5722359) where I benchmarked the performance of `Executor.map` with `Executor.submit` and found the former is significantly slower and I like to know why? – Sun Bear Feb 06 '17 at 23:30
11

(1) No need to make a list. You can use itertools.repeat to create an iterator that just repeats the some value.

(2) You need to pass a named function to map because it will be passed to the subprocess for execution. map uses the pickle protocol to send things, lambdas can't be pickled and therefore they can't be part of the map. But its totally unnecessary. All your lambda did was call a 2 parameter function with 2 parameters. Remove it completely.

The working code is

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • 1
    Thanks for the explanation and solution. :) – Sun Bear Feb 05 '17 at 22:19
  • 1
    I like to invite you to answer my [follow-up question](http://stackoverflow.com/q/42074501/5722359) where I benchmarked the performance of `Executor.map` with `Executor.submit` and found the former is significantly slower and I like to know why? – Sun Bear Feb 06 '17 at 23:34
9

Regarding your first question, do I understand it correctly that you want to pass an argument whose value is determined only at the time you call map but constant for all instances of the mapped function? If so, I would do the map with a function derived from a "template function" with the second argument (ref in your example) baked into it using functools.partial:

from functools import partial
refval = 5

def _findmatch(ref, listnumber):  # arguments swapped
    ...

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(partial(_findmatch, refval), numberlist):
        ...

Re. question 2, first part: I haven't found the exact piece of code that tries to pickle (serialize) the function that should then be executed in parallel, but it sounds natural that that has to happen -- not only the arguments but also the function has to be transferred to the workers somehow, and it likely has to be serialized for this transfer. The fact that partial functions can be pickled while lambdas cannot is mentioned elsewhere, for instance here: https://stackoverflow.com/a/19279016/6356764.

Re. question 2, second part: if you wanted to call a function with more than one argument in ProcessPoolExecutor.map, you would pass it the function as the first argument, followed by an iterable of first arguments for the function, followed by an iterable of its second arguments etc. In your case:

for n in executor.map(_findmatch, numberlist, ref):
    ...
Community
  • 1
  • 1
mkorvas
  • 563
  • 3
  • 10
  • Thanks for sharing. :) Your solutions worked. It's my first time learning about partial too. – Sun Bear Feb 05 '17 at 21:43
  • I like to invite you to answer my [follow-up question](http://stackoverflow.com/q/42074501/5722359) where I benchmarked the performance of `Executor.map` with `Executor.submit` and found the former is significantly slower and I like to know why? – Sun Bear Feb 06 '17 at 23:34
  • @mkorvas I used your solution for my question https://stackoverflow.com/questions/56492876/unable-to-send-multiple-arguments-to-concurrrent-futures-executor-map – gansub Jun 08 '19 at 09:16