87

I'm trying to use the multiprocess Pool object. I'd like each process to open a database connection when it starts, then use that connection to process the data that is passed in. (Rather than opening and closing the connection for each bit of data.) This seems like what the initializer is for, but I can't wrap my head around how the worker and the initializer communicate. So I have something like this:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

how do I (or do I) get the cursor back from get_cursor() into the process_data()?

Chris Curvey
  • 9,738
  • 10
  • 48
  • 70

5 Answers5

112

The initialize function is called thus:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

so there is no return value saved anywhere. You might think this dooms you, but no! Each worker is in a separate process. Thus, you can use an ordinary global variable.

This is not exactly pretty, but it works:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

Now you can just use cursor in your process_data function. The cursor variable inside each separate process is separate from all the other processes, so they do not step on each other.

(I have no idea whether psycopg2 has a different way to deal with this that does not involve using multiprocessing in the first place; this is meant as a general answer to a general problem with the multiprocessing module.)

torek
  • 448,244
  • 59
  • 642
  • 775
  • @torek Should the set_global_cursor be called in init_worker? – The Unfun Cat Jun 13 '15 at 07:06
  • 2
    @TheUnfunCat: not knowing what `init_worker` is (I see one in your answer but there's none in the original question) I can't really say for sure. The general idea is to allow `multiprocess.Pool` to create a pool of processes and to have each of those processes create (its own private copy of) the database connection. If you want this to happen when the pool process is started, you use the initializer function. If you want it to happen later, you can do it later. Either way you need a persistent variable, as with `function.cursor` in your method, or a plain `global`. – torek Jun 14 '15 at 00:40
  • 3
    Anyways, I find both my and your solution hideous and slightly magical (I'm sure pylint would complain too). I wonder if there is a more pythonic way... – The Unfun Cat Jun 14 '15 at 12:18
  • I find this solution will fail if the initializer function or the worker function is in another file than the file execute pool.map(). I am try to use sys._getframe do the hack,but this make me feel uncomfortable,I can't find any other way @torek – Tarjintor Mar 22 '18 at 10:00
  • 1
    @Tarjintor: there should not be issues with crossing file boundaries since the key is that these are separate *processes* (as if two different people ran two different `python ` commands), so name-spaces work as usual. I find it helpful to name each process: the first one (the one you run) is Alice, the second (that Alice starts) is Bob, and so on. Then you can say "Alice's variable X is set to 3, Bob's X is set to 42..." – torek Mar 22 '18 at 15:10
  • 1
    It works! It is really great as objects coming from libs like SWIG can't be pickled and this makes is work as pickling is not needed. It makes possible to run stuff like SentencePiece 6x faster on my 6-core i5. Thank you! – Marcin Nov 25 '19 at 23:33
31

You can also send the function along to the initializer and create a connection in it. Afterwards you add the cursor to the function.

def init_worker(function):
    function.cursor = db.conn()

Now you can access the db through function.cursor without using globals, for example:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))
Kevin
  • 313
  • 2
  • 12
The Unfun Cat
  • 29,987
  • 31
  • 114
  • 156
  • 2
    Is your process command something like: p = Pool(initializer=init_worker, args=(func)); p.map(func, args_set); ?? – Carl F. Jul 31 '15 at 13:32
  • Yes, something like that (I remember this working, but have not worked on related stuff in a while so do not remember the exact details, feel free to dv or modify my answer,) – The Unfun Cat Jul 31 '15 at 16:49
  • 3
    I like this answer because it doesn't pass the initializer arguments for every call. If the initializer arguments are large then I don't want them to be pickled at every call. – Stanley Bak Jun 19 '19 at 01:20
  • 1
    Is this different from attaching the cursor before the call to Pool? Does it work because `.map()` only pickles the function once? – tlamadon Dec 19 '19 at 03:30
  • 2
    I don't understand this answer. Where will the SQL logic be executed? – Basil Musa Jul 05 '21 at 17:19
  • How do you reference `.cursor` from within the function? – falsePockets Mar 14 '23 at 05:40
13

torek has already give a good explanation of why initializer is not working in this case. However, I am not a fan of Global variable personally, so I'd like to paste another solution here.

The idea is to use a class to wrap the function and initialize the class with the "global" variable.

class Processor(object):
  """Process the data and save it to database."""

  def __init__(self, credentials):
    """Initialize the class with 'global' variables"""
    self.cursor = psycopg2.connect(credentials).cursor()

  def __call__(self, data):
    """Do something with the cursor and data"""
    self.cursor.find(data.key)

And then call with

p = Pool(5)
p.map(Processor(credentials), list_of_data)

So the first parameter initialized the class with credential, return an instance of the class and map call the instance with data.

Though this is not as straightforward as the global variable solution, I strongly suggest to avoid global variable and encapsulate the variables in some safe way. (And I really wish they can support lambda expression one day, it will make things much easier...)

yeelan
  • 1,387
  • 2
  • 16
  • 22
  • 4
    I like this answer because it is pretty, but won't it reconnect for every item in the list? – woot May 27 '16 at 06:42
  • 20
    It *is* generally nice to avoid globals, and you can do something like this, but you'll want to defer initializing `self.cursor` until `p.map` has actually spun up the process instance. That is, your `__init__` would just set this to `None` and `__call__` would say `if self.cursor is None: self.cursor = ...`. In the end, what we really need is a per-process singleton. – torek Jul 23 '16 at 08:13
  • You could also try using a map with thread/process ids as keys, so you totally isolate connections per thread/process. – Jose Alban Mar 07 '17 at 16:35
  • 5
    Doesn't this cause the initialiser to be rerun for each task (potentially more than once per process in the pool)? – benjimin Feb 28 '18 at 22:32
  • 5
    If initialization is time consuming, this answer basically serializes the initialization, which is a wrong answer. Also, some time initialization must not be done in one process twice. – dashesy Apr 27 '18 at 16:51
  • 20
    This solution does not achieve the same result as using a global variable. Each time `map(...)` hands a task from `list_of_data` to `Processor.__call__()`, the entire `Processor` object is pickled, and passed as the first parameter to `__call__(self, data)` b/c it is an instance method. Even if a `psycopg2.connection.Cursor()` object is pickle-able, you aren't able to initialize any variables, you just pickle the object, and access it off of the `self` instance in `__call__()` within the child Process. Additionally, if any object on `Processor` is large, this solution will slow to a crawl. – The Aelfinn Jul 29 '18 at 12:37
8

Given defining global variables in the initializer is generally undesirable, we can avoid their use and also avoid repeating costly initialization within each call with simple caching within each subprocess:

from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep


@lru_cache(maxsize=None)
def _initializer(a, b):
    print(f'Initialized with {a}, {b}')


def _pool_func(a, b, i):
    _initializer(a, b)
    sleep(1)
    print(f'got {i}')


arg_a = 1
arg_b = 2

with Pool(processes=5) as pool:
    pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))

Output:

Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
got 1
got 0
got 4
got 2
got 3
got 5
got 7
got 8
got 6
got 9
got 10
got 11
got 12
got 14
got 13
got 15
got 16
got 17
got 18
got 19
mcguip
  • 5,947
  • 5
  • 25
  • 32
  • 3
    This only saves you the compute expanded in initializer. If instead your initializer mostly consist of transmitting a lot of data between the main and the worker process, then it doesn't help you, contrarily to the above solutions. – caliloo Nov 24 '19 at 14:05
0

If you first answer wasn't clear, here is snippet that runs:

import multiprocessing
n_proc = 5
cursor = [ 0 for _ in range(n_proc)]
def set_global_cursor():
    global cursor
    cursor[multiprocessing.current_process()._identity[0]-1] = 1

def process_data(data):
    print(cursor)
    return data**2
    
pool = multiprocessing.Pool(processes=n_proc,initializer=set_global_cursor)
pool.map(process_data, list(range(10))) 

Output:

[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 1, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 0, 1]
[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 1, 0]
[0, 1, 0, 0, 0]
Ohad Rubin
  • 460
  • 3
  • 13