3

I've a function that runs multiple queries in parallel but I'm having some troubles to run my function using multprocessing with more than argument. I've this code:

def run(args):
    query, cursor = args
    cursor.execute(query)
with multiprocessing.Pool(processes=10) as pool:
    args = (product(queries),cursor)
    results = pool.starmap(run(args))

If I run only pool.starmap(run(product(queries))) it works well, however I have the need to pass also the cursor object.

How I can do that?

I'm having the following error:

TypeError: starmap() missing 1 required positional argument: 'iterable'
tobias_k
  • 81,265
  • 12
  • 120
  • 179
Pedro Alves
  • 1,004
  • 1
  • 21
  • 47
  • 1
    Did you mean `starmap(run, args)`? When you do `pool.starmap(run(product(queries)))` you just execute `run` with the arguments and then pass the result of that function, `None`, to `starmap. – tobias_k Feb 05 '21 at 19:03
  • If I run starmap(run,args) I got TypeError: cannot pickle 'SSLSocket' object – Pedro Alves Feb 05 '21 at 19:06
  • Just to get this clear: You want to execute `run` with each of the different combinations in `itertools.product(queries)` (whatever queries is), and each with _the same_ `cursor`, right? – tobias_k Feb 05 '21 at 19:08
  • Yes, I want to execute multiple queries in parallel with the same cursor – Pedro Alves Feb 05 '21 at 19:09

2 Answers2

2

There are some problems with your code:

  • you call run with parameters, then pass the result to starmap, but you have to pass both the function and its parameters separately to starmap
  • your args are a tuple of first the product of all queries, then the cursor, but you rather want to combine each of those query-combinations with the cursor
  • your function expects a single parameter that you then unpack inside the function, so you should use map; for starmap it should be def run(query, cursor)

Try this:

import multiprocessing
import itertools

def run(args):
    query, cursor = args
    print("running", query, cursor)

queries = ["foo", "bar", "blub"]
cursor = "whatever"
    
with multiprocessing.Pool(processes=10) as pool:
    args = ((args, cursor) for args in itertools.product(queries))
    results = pool.map(run, args)

There may be more "upstream" errors, like that SSLSocket stuff, but this should at least (try to) call the function with the correct parameters.

tobias_k
  • 81,265
  • 12
  • 120
  • 179
1

pool.starmap takes two arguments: the function and a list of arguments.

From the docs:

pool.starmap(func, [(1,2), (3, 4)])
# results in [func(1,2), func(3,4)]
# I guess in your case would be

pool.starmap(run, [ product(queries), cursor ])
DDomen
  • 1,808
  • 1
  • 7
  • 17
  • I got 'TypeError: cannot pickle 'SSLSocket' object' with this code :( – Pedro Alves Feb 05 '21 at 19:07
  • 1
    You can not share objects that are not pickable between threads (like sockets) but you can pass details on how to reproduce or share them. You can see more details [here (Share unserializable objects between processes - StackOverflow)](https://stackoverflow.com/questions/21968278/multiprocessing-share-unserializable-objects-between-processes) – DDomen Feb 05 '21 at 19:14
  • Also if you can instantiate your `SSLSocket` inside the `run` method would be simpler – DDomen Feb 05 '21 at 19:16