5

I'm trying to learn the joblib module as an alternative to the builtin multiprocessing module in python. I'm used to using multiprocessing.imap to run a function over an iterable and returning the results as they come in. In this minimal working example, I can't figure out how to do it with joblib:

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x

Which prints:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2

I'd like to see the output:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2

Or something similar, indicating that the iterable MP(...) is not waiting for all the results to complete. For longer demo change n_jobs=-1 and range(100).

Hooked
  • 84,485
  • 43
  • 192
  • 261

3 Answers3

6

stovfl's answer is elegant, but it only works for the first batches dispatched. In the example, it works because the workers never starve (n_tasks < 2*n_jobs). For this approach to work, the callback originally passed to apply_async must also be called. This is an instance of BatchCompletionCallBack, which schedules the next batch of tasks to be processed.

One possible solution is to wrap up arbitrary callbacks in a callable object, like this (tested in joblib==0.11, py36):

from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time

class MultiCallback:
    def __init__(self, *callbacks):
        self.callbacks = [cb for cb in callbacks if cb]

    def __call__(self, out):
        for cb in self.callbacks:
            cb(out)

class ImmediateResultBackend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % result)

    def apply_async(self, func, callback=None):
        cbs = MultiCallback(callback, self.callback)
        return super().apply_async(func, cbs)

register_parallel_backend('custom', ImmediateResultBackend)

def hello(n):
    time.sleep(1)
    print("Inside function", n)
    return n

with parallel_backend('custom'):
    res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))

Output

Inside function 0
Inside function 1
    ImmediateResult function [0]
    ImmediateResult function [1]
Inside function 3
Inside function 2
    ImmediateResult function [3]
    ImmediateResult function [2]
Inside function 4
    ImmediateResult function [4]
Inside function 5
    ImmediateResult function [5]
Carlos Santos
  • 356
  • 4
  • 4
  • Works nicely, thanks you! As a sidenote: the result in `callback` will then be a list of results. Also, if you want to catch the currently active backend you can let the `ImmediateResultBackend` inherit from `type(joblib.parallel.get_active_backend()[0])`. Like so: `class ImmediateResultBackend(type(joblib.parallel.get_active_backend()[0])): ...`. And finally it might make sense to deregister the backend again with: `del joblib.parallel.BACKENDS["custom"]`. – Martin Becker Apr 15 '20 at 00:52
3

To get Immediate results from joblib, for instance:

from joblib._parallel_backends import MultiprocessingBackend

class ImmediateResult_Backend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % (result))

    # Overload apply_async and set callback=self.callback
    def apply_async(self, func, callback=None):
        applyResult = super().apply_async(func, self.callback)
        return applyResult

joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)

with joblib.Parallel(n_jobs=2) as parallel:
    func = parallel(delayed(hello)(y) for y in range(3))
    for f in func:
        print("Outside function %s" % (f))

Output:
Note: I use time.sleep(n * random.randrange(1,5)) in def hello(...), therefore processes become different ready.

Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2

Tested with Python:3.4.2 - joblib:0.11

stovfl
  • 14,998
  • 7
  • 24
  • 51
  • 1
    This is great, thank you! I didn't realize you could hook into a new function, I didn't know what to call it. It's worth noting (after looking into it): "Warning: this function is experimental and subject to change in a future version of joblib." – Hooked Mar 31 '17 at 17:52
  • @Hooked: _change_ does not mean **drop** in a future version. Beside this I could also get Immediate results using a **queue** instead of `hook`. – stovfl Mar 31 '17 at 19:56
-1
>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     res = MP(func(x) for x in range(3))  # This is not an iterator.
... 
Inside function 0
Inside function 1
Inside function 2
>>> type(res)
<type 'list'>

What you are dealing with is not a generator. Therefore you should not expect that it will provide you with intermediate results. Nothing that I read in the docs seem to mention otherwise (or I didn't read the relevant parts).

You are welcome to read the docs and search for "intermediate" results topic: https://pythonhosted.org/joblib/search.html?q=intermediate&check_keywords=yes&area=default

My understanding is that each call to parallel is a barrier, and in order to have intermediate results, you need to chunk the processing:

>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     for chunk in range(3):
...         x = MP(func(y) for y in [chunk])
...         print "Outside function", x
... 
Inside function 0
Outside function [0]
Inside function 1
Outside function [1]
Inside function 2
Outside function [2]
>>> 

If you want to get technical, there is a callback mechanism, but it is used exclusively for progress reporting (BatchCompletionCallBack), but you would need more involved code changes.

dnozay
  • 23,846
  • 6
  • 82
  • 104
  • I'm not sure how your code block differs from mine -- all you've done is move the assignment `res` into the loop statement and add a comment. The problem still remains, I'm looking for a way to have joblib return intermediate results. The output of your answer does not match the expected output, in fact it returns the same thing I identified as the problem! – Hooked Mar 29 '17 at 13:47
  • I said "Your code is equivalent to", that's pretty clear what that means. I have changed it to a snippet to make it less confusing – dnozay Mar 29 '17 at 16:03
  • So to be clear, it is not possible to use joblib without computing the entire result set first? I understand our code blocks produce a list that has to be completed before displaying, but can one use joblib to do so? What is the `multiprocessing.imap` equivalent? [if nothing else, this helped refine my question] – Hooked Mar 29 '17 at 17:49