46

I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

Who runs the callback method? Is it the main process that called apply_async?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

Here's an example.

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

The output is something like

PoolWorker-1 running func with arg 0

PoolWorker-2 running func with arg 1

PoolWorker-3 running func with arg 2

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 running func with arg 3

PoolWorker-1 running func with arg 4

PoolWorker-2 running func with arg 5

PoolWorker-3 running func with arg 6

PoolWorker-4 running func with arg 7

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess running callback with arg 1

MainProcess running callback with arg 2

MainProcess running callback with arg 3

MainProcess running callback with arg 4

PoolWorker-1 running func with arg 8

...

Finished with script

How is MainProcess running the callback while it's in the middle of that while loop??

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

apply_async(func[, args[, kwds[, callback]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

Community
  • 1
  • 1
Alex
  • 1,442
  • 2
  • 11
  • 16

1 Answers1

46

There is indeed a hint in the docs:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

The interesting thread for us is _result_handler; we'll get to why shortly.

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

dano
  • 91,354
  • 19
  • 222
  • 219
  • 2
    Thanks Dano for taking the time to write such a detailed response! If I understand correctly, the pool creates a *single* new thread (the result_handler) whose job it is to just wait around for apply_async's to complete and then calls the callback in the result_handler's thread (which is part of the MainProcess). Will the callbacks (for a single pool object) be called sequentially? I.e. A bunch of apply_async's may finish together but the callbacks will be run one by one in serial by the result_handler? – Alex Jul 16 '14 at 15:40
  • 5
    One more question. What if the callback function and the main script both mess with the same objects (in the MainProcess)? Could there be unpredictable behavior? I.e. if the callback and something later in the main script both try to write to the same file or modify the same array. When the callback actually gets run who knows what the main script will be doing at that time. – Alex Jul 16 '14 at 15:41
  • 8
    @Alex Yes, the callbacks will be executed squentially. The `_result_handler` thread pulls one completed task off the queue, calls `_set`(which runs the callback), then moves on to the next one. This is why the documentation says to make sure the callback completes immediately; executing the callback blocks other results from being processed. – dano Jul 16 '14 at 15:44
  • 5
    @Alex You definitely need to worry about thread safety of any object you change in the callback. In general, I would suggest doing as little as possible in the callback, but if you absolutely need to touch shared state, you have to protect it with a mutex of some kind. – dano Jul 16 '14 at 15:46