105

The concurrent.futures.Executor.map takes a variable number of iterables from which the function given is called. How should I call it if I have a generator that produces tuples that are normally unpacked in place?

The following doesn't work because each of the generated tuples is given as a different argument to map:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

Without the generator, the desired arguments to map might look like this:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)
Leniel Maccaferri
  • 100,159
  • 46
  • 371
  • 480
Matt Joiner
  • 112,946
  • 110
  • 377
  • 526
  • I don't get what you want. In your latest edit the example without the generator doesn't work since each element on the generator has only two elements, what is the value of N? – vz0 Aug 08 '11 at 01:47
  • @vz0: N is the number of items in the tuples generated by `args`. – Matt Joiner Aug 09 '11 at 03:06

10 Answers10

97

One argument that is repeated, one argument in c

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

Need to unpack items of c, and can unpack c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

Need to unpack items of c, can't unpack c

  1. Change f to take a single argument and unpack the argument in the function.
  2. If each item in c has a variable number of members, or you're calling f only a few times:

    executor.map(lambda args, f=f: f(*args), c)
    

    It defines a new function that unpacks each item from c and calls f. Using a default argument for f in the lambda makes f local inside the lambda and so reduces lookup time.

  3. If you've got a fixed number of arguments, and you need to call f a lot of times:

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    
    executor.map(f, *itemtee(c, n))
    

Where n is the number of arguments to f. This is adapted from itertools.tee.

agf
  • 171,228
  • 44
  • 289
  • 238
  • Repeat is useful, but my example differed from the question. I've tried to improve it. Sorry about that. – Matt Joiner Aug 08 '11 at 01:16
  • Yeah this zip unpacking works, but the entire generator contents are consumed when unpacking the arguments to zip. The lambda also has the advantage that not every call to the map function has to have precisely the same number of arguments (not that this is a requirement). – Matt Joiner Aug 09 '11 at 03:05
  • That was the smaller of the issues, the bigger problem is having to process the entire generator. – Matt Joiner Aug 09 '11 at 05:25
  • No no, I want to unpack each generated item as the arguments to `f`. `for p in args: f(*p)`. Sorry it's so hard to explain :\ – Matt Joiner Aug 10 '11 at 22:02
  • I wrote a similar alternate form of `itertools.tee`, but found @vzo's lambda to be a much simpler solution. Care to explain why the lambda form has a high overhead? – Matt Joiner Aug 10 '11 at 23:29
  • Function calls and namespace lookups are slow in Python. `map` looks up `f` and each iterable once, then uses them repeatedly. With `lambda`, it has to look up `f` every time, and it has to call both the `lambda` and `f` as well as unpack the arguments. If `f` takes many seconds or longer to run, or you only call it a few times, this overhead is insignificant. My `itemtee` uses only local variables and (as edited) doesn't even use any attribute lookups for method calls. – agf Aug 10 '11 at 23:43
  • You should consider using a default argument in the `lambda` to speed up calling `f`. See my edit. – agf Aug 11 '11 at 03:50
  • @agf - Any thoughts on my question -https://stackoverflow.com/questions/56492876/unable-to-send-multiple-arguments-to-concurrrent-futures-executor-map ? – gansub Jun 08 '19 at 01:16
85

You need to remove the * on the map call:

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

This will call f, len(args) times, where f should accept one parameter.

If you want f to accept two parameters you can use a lambda call like:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass
vz0
  • 32,345
  • 7
  • 44
  • 77
25

So suppose you have a function which takes 3 arguments and all the 3 arguments are dynamic and keep on changing with every call. For example:

def multiply(a,b,c):
    print(a * b * c)

To call this multiple times using threading, I would first create a list of tuples where each tuple is a version of a,b,c:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

To we know that concurrent.futures's map function would accept first argument as the target function and second argument as the list of arguments for each version of the function that will be execute. Therefore, you might make a call like this:

for _ in executor.map(multiply, arguments) # Error

But this will give you error that the function expected 3 arguments but got only 1. To solve this problem, we create a helper function:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

Now, we can call this function using executor as follow:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

That should give you the desired results.

Baqir Khan
  • 694
  • 10
  • 25
24

You can use currying to create new function via partial method in Python

from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(param1, param2):
    # some code

# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
    executor.map(func, list_of_args):
    ...

If you need to pass more than one the same parameters you can pass them to partial method

func = partial(some_func, a, b, c)
Vlad Bezden
  • 83,883
  • 25
  • 248
  • 179
  • This solution doesn't work for me. How the compiler is supposed to know that the list in list_of_args refers to a specific attribute of some_func? Say, I have a function with 5 parameters: conduct_analysis(a, b, c, d, e) and I do: partial_analysis = partial(conduct_analysis, a=a, c=c, d=d, e=e). Then I call: res = executor.map(partial_analysis, list_of_b). The compilar doesn't know that the content of "list_of_b" should fill the missing parameter "b" of partial. When I run the code with the function with five args, I get various exceptions like type X has no attr Y, or positional arguments etc – SkogensKonung Oct 11 '20 at 16:03
  • @Dawid put the one your not masking first so `conduct_analysis(b, a, c, d, e)` – CpILL Mar 16 '21 at 23:11
7

Here's a code snippet showing how to send multiple arguments to a function with ThreadPoolExecutor:

import concurrent.futures


def hello(first_name: str, last_name: str) -> None:
    """Prints a friendly hello with first name and last name"""
    print('Hello %s %s!' % (first_name, last_name))


def main() -> None:
    """Examples showing how to use ThreadPoolExecutor and executer.map
    sending multiple arguments to a function"""

    # Example 1: Sending multiple arguments using tuples
    # Define tuples with sequential arguments to be passed to hello()
    args_names = (
        ('Bruce', 'Wayne'),
        ('Clark', 'Kent'),
        ('Diana', 'Prince'),
        ('Barry', 'Allen'),
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the tuple (*f) into hello(*args)
        executor.map(lambda f: hello(*f), args_names)

    print()

    # Example 2: Sending multiple arguments using dict with named keys
    # Define dicts with arguments as key names to be passed to hello()
    kwargs_names = (
        {'first_name': 'Bruce', 'last_name': 'Wayne'},
        {'first_name': 'Clark', 'last_name': 'Kent'},
        {'first_name': 'Diana', 'last_name': 'Prince'},
        {'first_name': 'Barry', 'last_name': 'Allen'},
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the dict (**f) into hello(**kwargs)
        executor.map(lambda f: hello(**f), kwargs_names)


if __name__ == '__main__':
    main()
Leandro Toledo
  • 466
  • 4
  • 6
3

For ProcessPoolExecutor.map():

Similar to map(func, *iterables) except:

the iterables are collected immediately rather than lazily;

func is executed asynchronously and several calls to func may be made concurrently.

Therefore, the usage of ProcessPoolExecutor.map() is the same as that of Python's build-in map(). Here is the docs:

Return an iterator that applies function to every item of iterable, yielding the results. If additional iterable arguments are passed, function must take that many arguments and is applied to the items from all iterables in parallel.

Conclusion: pass the several parameters to map().

Try running the following snippet under python 3, and you will be quite clear:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4
Tengerye
  • 1,796
  • 1
  • 23
  • 46
2

I have seen so many answers here, but none of them is as straight forward as using lambda expressions:

foo(x,y): pass

want to call above method 10 times, with same value i.e. xVal and yVal? with concurrent.futures.ThreadPoolExecutor() as executor:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass
H L
  • 21
  • 2
2

lets say you have data like this in data frame shown below and you want to pass 1st two columns to a function which will read the images and predict the fetaures and then calculate the difference and return the difference value.

Note: you can have any scenario as per your requirement and respectively you can define the function.

The below code snippet will takes these two columns as argument and pass to the Threadpool mechanism (showing the progress bar also)

enter image description here

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' read 1st image and extract feature '''
       arr2 = ''' read 2nd image and extract feature '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

enter image description here

Vaibhav K
  • 2,762
  • 3
  • 21
  • 22
-1

A simple utility that I use all the time is below.

########### Start of Utility Code ###########

import os
import sys
import traceback

from concurrent import futures
from functools import partial


def catch(fn):
    def wrap(*args, **kwargs):
        result = None
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            type_, value_, traceback_ = sys.exc_info()
            return None, (
                args,
                "".join(traceback.format_exception(type_, value_, traceback_)),
            )
        else:
            return result, (args, None)

    return wrap


def top_level_wrap(fn, arg_tuple):
    args, kwargs = arg_tuple
    return fn(*args, *kwargs)


def create_processes(fn, values, handle_error, handle_success):
    cores = os.cpu_count()
    max_workers = 2 * cores + 1

    to_exec = partial(top_level_wrap, fn)

    with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        for result, error in executor.map(to_exec, values):
            args, tb = error
            if tb is not None:
                handle_error(args, tb)
            else:
                handle_success(result)


########### End of Utility Code ###########

Example usage -

######### Start of example usage ###########

import time


@catch
def fail_when_5(val):
    time.sleep(val)
    if val == 5:
        raise Exception("Error - val was 5")
    else:
        return f"No error val is {val}"


def handle_error(args, tb):
    print("args is", args)
    print("TB is", tb)


def top_level(val, val_2, test=None, test2="ok"):
    print(val_2, test, test2)
    return fail_when_5(val)

handle_success = print


if __name__ == "__main__":
    # SHAPE -> ( (args, kwargs), (args, kwargs), ... )
    values = tuple(
        ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
    )
    create_processes(top_level, values, handle_error, handle_success)

######### End of example usage ###########
shanu khera
  • 170
  • 1
  • 12
  • This needs comments or some explanation of what your code is doing. Explain what you're doing, and more importantly why. – Ryan McGrath Feb 16 '22 at 20:15
-1

This works for me:

from concurrent.futures import ThreadPoolExecutor

def concurrent_function(function, list):
  with ThreadPoolExecutor() as executor:
    executor.map(function, list)

def concurrent_multiply(args = {'a': 1, 'b': 2}):
  print(args['a']*args['b'])

concurrent_function(multiply, [{'a': 1, 'b': 1}, 
                               {'a': 2, 'b': 2}, 
                               {'a': 3, 'b': 3}])