7

I know this sounds like something that has been asked before, but wait, I'll explain why the other options don't work.

I'm currently using multiprocessing.Pool to implement parallelism in an application, and would like to extend this to be able to exploit nested parallelism. The naive approach of just passing the Pool object as an argument to apply_async doesn't work as noted in other answers, because Pool cannot be pickled.

Here are my requirements:

  1. I need some sort of a pool to limit the number of concurrent executing tasks. E.g. multiprocess.Pool serves this purposes, except it can't be passed to other processes.

  2. I need nested parallelism. In my application, I need to perform I/O in order to identify what the nested work is, so I absolutely don't want to do this from a single thread. I think that rules out all the answers to this question.

  3. It needs to be in the standard library; I can't add dependencies. That rules out this answer.

  4. I'd really like it to work with both Python 2 and 3. However, if it could be shown that moving to Python 3 would solve my problem, I would consider it.

I don't need this to use multiple processes specifically, it would be ok to use threads because most of the work is I/O or waiting on subprocesses to complete.

I have tried using multiprocessing.dummy, which is the same interface but implemented on top of threading. However, when I try to call get() to retrieve the results of my tests, I get the following error, so I think this is out.

  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
ValueError: signal only works in main thread

I am aware of the concurrent.futures library in Python 3, but this appears to have some severe limitations. For example, the second example in this section would seem to be a show stopper in my case:

https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor

I don't see how you could avoid hitting that with basically any straightforwardly-written nested parallel algorithm. So even if I was willing to use Python 3, I think this is a non-starter.

I'm not aware of any other options available in the standard library, without writing my own implementation.

Elliott Slaughter
  • 1,455
  • 19
  • 27
  • "I need to perform I/O in order to identify what the nested work is". Is this network I/O? If so, I wonder if eventlet for the IO + threading might work? http://eventlet.net/doc/threading.html – Michal Charemza Apr 20 '18 at 19:36
  • In my case it's file I/O; basically at the top level I've got a file per unit of work, and based on what that file contains there might be further nested parallelism I could exploit. However, I'd really rather avoid dependencies if there are any other reasonable options. It seems like a pretty major limitation if the standard library doesn't provide a way of exploiting nested parallelism that I'm just surprised if it's not possible. – Elliott Slaughter Apr 20 '18 at 19:56
  • Ah ok... another question: how many levels of nesting will there be? The file I/O on one level, and then work based from that on another level, and that's it? – Michal Charemza Apr 20 '18 at 20:18
  • At the moment, yes, two levels of nesting. – Elliott Slaughter Apr 20 '18 at 20:23
  • One more... for each file the might trigger further work. Can the work be triggered once that file has been processed, or should any more work be triggered along the way? – Michal Charemza Apr 20 '18 at 20:59
  • Algorithmically, it's possible to flatten the parallelism by doing it in two passes. Pass 1 reads all the files and builds a list of the work to be done. Pass 2 actually does to work. Not sure if that was what you were asking about or not. I was hoping to get away without restructuring my whole application to do this, but if it turns out this is the only answer I may have to consider it. – Elliott Slaughter Apr 20 '18 at 21:43

1 Answers1

2

You seem to have ruled it out, but I suspect https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor, or https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor would work, if you are able to move to Python 3, or add a dependency for Python 2.

If the extra work from each file doesn't have to be triggered until that file is processed, you can have a single coordinating thread that triggers all the others, and so deadlock can be prevented, as in below example.

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))
    return work

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    work_inputs = pool.submit(find_work_inputs, dummy_file)
    for work_input in work_inputs.result():
        result = work_input
        futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))

Alternatively, if each thread on the first level needs to trigger the work themselves, the extra work may need to be in another pool to prevent deadlock (depending on when result() is called on each future) as below.

from concurrent.futures import ThreadPoolExecutor
import time

find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))

    futures = []
    for work_input in work:
        futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
    return futures

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
Michal Charemza
  • 25,940
  • 14
  • 98
  • 165
  • It's too bad Python doesn't have a cleaner way to do true nested parallelism, but given the realities, I think this answer does a good job of showing what you have to do. To be fair though, option (1) would also work just as well with `multiprocess.Pool`. – Elliott Slaughter Apr 22 '18 at 22:46