1

i have a list of functions that does some job like download html from a url(each function is very different so i can't make a single function to accept url and downlaod). i have used multiprocessing to speed up the task. below is my code

def runInParallel(list_of_functions):
  for fn in list_of_functions:
    proc = [Process(target=fn[1]).start() for fn in list_of_functions]
  for p in proc:
    p.join()

what i want is how to store the result that each function returns? each function returns a dict that i need to parse and store in database and i dont want to repeat these steps in each function so what i want is some sort of callback that can be passed with results returned from fucntions. how can i achieve that?

EDIT: using pool but throws error. i have following for list_of_functions:

[('f1', <function f1 at 0x7f34c11c9ed8>), ('f2', <function f2 at 0x7f34c11c9f50>)]


def runInParallel(list_of_functions):
    import multiprocessing
    pool = multiprocessing.Pool(processes = 3)
    x = pool.map(lambda f: f(), list_of_functions)
    print x




File "main.py", line 31, in <module>
    runInParallel(all_functions)
  File "main.py", line 11, in runInParallel
    x = pool.map(lambda f: f(), list_of_functions)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
hansaplast
  • 11,007
  • 2
  • 61
  • 75
anekix
  • 2,393
  • 2
  • 30
  • 57
  • Possible duplicate of [Can I get a return value from multiprocessing.Process?](http://stackoverflow.com/questions/8329974/can-i-get-a-return-value-from-multiprocessing-process) – DavidW Feb 19 '17 at 09:42
  • See also http://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multi and http://stackoverflow.com/questions/10797998/is-it-possible-to-multiprocess-a-function-that-returns-something-in-python – DavidW Feb 19 '17 at 09:43
  • @DavidW thanks for response but how do i adjust `results = [result_queue.get() for mc in montecarlos]` in my code? – anekix Feb 19 '17 at 09:58
  • I think the easiest way is to use `multiprocessing.map` instead (like in this answer) http://stackoverflow.com/a/28799109/4657412. That may suggest I picked the wrong duplicate. – DavidW Feb 19 '17 at 10:24
  • @DavidW If every function is different, that rules out `Pool.map`. – Roland Smith Feb 19 '17 at 10:28
  • 1
    @RolandSmith `map(lambda f: f(), list_of_functions)` should still work even if they're all different I think? – DavidW Feb 19 '17 at 10:31
  • @DavidW Cool idea. But what about the parameters to `f()`? – Roland Smith Feb 19 '17 at 10:56
  • 1
    @RolandSmith something like `map(lamdba f, args: f(args), zip(list_of_functions,list_of_args_tuples))`? – DavidW Feb 19 '17 at 11:07
  • @DavidW what does `lambda f: f()` does in above example – anekix Feb 19 '17 at 11:49
  • @DavidW i have edited my question with your hint but it throws error `cPickle.PicklingError: Can't pickle : attribute lookup __builtin__.function failed ` – anekix Feb 19 '17 at 11:55
  • @anekix Unfortunately `multiprocessing` on Windows is a bit of a disaster. It relies on everything being pickleable, which isn't always easy to ensure. You should replace the lambda with a normal function `def call_function(f): return f()` and do `map(call_function, list_of_functions)` but you may still run into problems if `list_of_functions` is not pickleable. – DavidW Feb 19 '17 at 12:19
  • @DavidW i am using linux ubuntu – anekix Feb 19 '17 at 13:42
  • @anekix I don't know - sorry. I thought that `multiprocessing` on linux mostly avoided pickling, but that doesn't look to be the case completely – DavidW Feb 19 '17 at 14:14
  • @DavidW Under the hood, `multiprocessing.Pool` uses a `Queue` to transmit data to other processes. And according to the documentation: `When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe`. – Roland Smith Feb 19 '17 at 21:08
  • @DavidW did you see the format of `list_of_functions` that i mentioned in the edit in my question ?i think that might be the issue can you please look at it once – anekix Feb 20 '17 at 06:30
  • @anekix I missed that. List of functions is a list of tuples so you want to select element 1 of eaach tuple. Something like `lambda f: f[1]()` maybe? Or an equivalent change to the normal function version. – DavidW Feb 20 '17 at 07:39

1 Answers1

3

As mentioned in the comments above: if you use Process directly you need to set up a queue where the processes put into, so you can get from the parent process:

from multiprocessing import Process, Queue
from time import sleep

def f1(queue):
    sleep(1) # get url, "simulated" by sleep
    queue.put(dict(iam="type 1"))

def f2(queue):
    sleep(1.5)
    queue.put(dict(iam="type 2"))

def f3(queue):
    sleep(0.5)
    queue.put(dict(iam="type 3"))


def runInParallel(list_of_functions):
    queue = Queue()
    proc = [Process(target=fn[1], args=(queue,)) for fn in list_of_functions]
    for p in proc:
        p.start()
    res = []
    for p in proc:
        p.join()
        res.append(queue.get())
    return res

if __name__ == '__main__':
    list_of_functions = [("f1", f1), ("f2", f2), ("f3", f3)]
    for d in runInParallel(list_of_functions):
        print d

Prints:

{'iam': 'type 3'}
{'iam': 'type f1'}
{'iam': 'type f2'}

If your functions basically do all the same (fetching urls and process the html in some way), then merging your functions into one with some if/elif logic allows you to use map and you would not need any queue:

from multiprocessing import Pool
from time import sleep

def f(arg):
    url, typ = arg
    if typ == 'a':
        sleep(1) # instead you would do something with `url` here
        return dict(iam="type 1", url=url)
    elif typ == 'b':
        sleep(1.5)
        return dict(iam="type 2", url=url)
    elif typ == 'c':
        sleep(0.5)
        return dict(iam="type 3", url=url)

def runInParallel(work):
    p = Pool(3)
    return p.map(f, work)

if __name__ == '__main__':
    work = [('http://url1', 'a'),
        ('http://url2', 'b'),
        ('http://url3', 'c'),
        ]
    for d in runInParallel(work):
        print d

Prints:

{'url': 'http://url1', 'iam': 'type 1'}
{'url': 'http://url2', 'iam': 'type 2'}
{'url': 'http://url3', 'iam': 'type 3'}

Both scripts work both on Windows as in Unix environments (tried it on OSX)

hansaplast
  • 11,007
  • 2
  • 61
  • 75