20

I am trying out multiprocessor programming with Python. Take a divide and conquer algorithm like Fibonacci for example. The program flow of execution would branch out like a tree and execute in parallel. In other words, we have an example of nested parallelism.

From Java, I have used a threadpool pattern to manage resources, since the program could branch out very quickly and create too many short-lived threads. A single static (shared) threadpool can be instantiated via ExecutorService.

I would expect the same for Pool, but it appears that Pool object is not to be globally shared. For example, sharing the Pool using multiprocessing.Manager.Namespace() will lead to the error.

pool objects cannot be passed between processes or pickled

I have a 2-part question:

  1. What am I missing here; why shouldn't a Pool be shared between processes?
  2. What is a pattern for implementing nested parallelism in Python? If possible, maintaining a recursive structure, and not trading it for iteration.

from concurrent.futures import ThreadPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

def main():
    global pool

    N = int(10)
    with ThreadPoolExecutor(2**N) as pool:
        print(fibonacci(N))

main()

Java

public class FibTask implements Callable<Integer> {

    public static ExecutorService pool = Executors.newCachedThreadPool();
    int arg;

    public FibTask(int n) {
        this.arg= n;
    }

    @Override
    public Integer call() throws Exception {
        if (this.arg > 2) { 
            Future<Integer> left = pool.submit(new FibTask(arg - 1));
            Future<Integer> right = pool.submit(new FibTask(arg - 2));
            return left.get() + right.get();
        } else {
            return 1;
        }

    } 

  public static void main(String[] args) throws Exception {
      Integer n = 14;
      Callable<Integer> task = new FibTask(n);
      Future<Integer> result =FibTask.pool.submit(task); 
      System.out.println(Integer.toString(result.get()));
      FibTask.pool.shutdown();            
  }    

}

I'm not sure if it matters here, but I am ignoring the difference between "process" and "thread"; to me they both mean "virtualized processor". My understanding is, the purpose of a Pool is for sharing of a "pool" or resources. Running tasks can make a request to the Pool. As parallel tasks complete on other threads, those threads can be reclaimed and assigned to new tasks. It doesn't make sense to me to disallow sharing of the pool, so that each thread must instantiate its own new pool, since that would seem to defeat the purpose of a thread pool.

Community
  • 1
  • 1
T. Webster
  • 9,605
  • 6
  • 67
  • 94
  • Why do you need it to be shared globally? Can't you contain it all inside one namespace/class? – Inbar Rose Jun 11 '13 at 08:24
  • 2
    @InbarRose The problem is that in a recursive function that executes the recursive call inside a different process, the pool is forked and is also called by the subprocess. This causes problems with the queues hence it doesn't work. Anyway I'd like to stress that in Java you are using *threads*. With threads there aren't any problems since there is no forking of the pool object. I believe using a process pool in Java would lead to, more or less, the same behaviour. – Bakuriu Jun 11 '13 at 09:33
  • @InbarRose I also tried containing `Pool` as a class instance and static variable, but still reach the same with problem. For example, with `Pool` and the recursive calls contained within a single class, but doing so still leads to the same problem: > pool objects cannot be passed between processes... – T. Webster Jun 11 '13 at 15:27
  • @Bakuriu correct Java is using threads. There is a Python thread [pool](http://stackoverflow.com/questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool) but it's not officially documented, so I'm skeptical. – T. Webster Jun 11 '13 at 15:54
  • 1. your code wouldn't work even if the pool were shared (try it with ThreadPool) Provide Java code with a thread pool if you think the code should work 2. If you are doing the work only in a fixed number of processes then assuming sufficiently large input, you *are* trying to convert recursion into iteration (implicitly through the pool; It is a deadlock waiting to happen). 3. multiprocessing.dummy (unfortunate name) *is* mentioned in the docs. The API is identical and a part of the implementation is shared with the process-based code. I had no issues using it. – jfs Jun 12 '13 at 07:05
  • The working Java code is up. Why wouldn't the code work? – T. Webster Jun 12 '13 at 10:27
  • 1
    @T.Webster: "Why wouldn't the code work?": 1. think, why are there both `.apply()` and `.apply_async()` methods? 2. Python pools are more like `FixedThreadPool` (it means deadlock if there are not enough threads) rather than `CachedThreadPool` (in this case fibonacci creates a glorified fork-bomb). Here's how the code could look like in Python (though it is pointless) [`concurrent.futures`-based](http://ideone.com/2xxbSU) and [`mp.dummy`-based](http://ideone.com/0BLcDH) code. – jfs Jun 12 '13 at 14:17
  • I tried 1. `think` and noticed my python code already does call `apply_async()` which almost seems correct, but I left out a call to block until the 2 sub-processes complte, which is what `Future.get()` would accomplish. It seems `.result()` accomplishes the same, yet your `concurrent.futures` code terminates with an error. Why, because the threads exceeded the fixed `max_worker` size? – T. Webster Jun 13 '13 at 10:34
  • @J.F. Sebastian thanks for the help although I do not know why you would bother for no points. I will place a bounty. – T. Webster Jun 13 '13 at 20:28
  • 1
    Try to run the code on your own computer. It should work if your environment allows to create enough threads. On Python 2, one the scripts requires `pip install futures`. – jfs Jun 13 '13 at 22:30
  • 1
    'but I am ignoring the difference between "process" and "thread"' It just can't be ignored. Mixing threading and multiprocessing is possible although, but you have to know about common caveats of Python implementation (CPython assumed) - there is no big performance boost from using threads in Python, if they are doing continuous work - read about Global Interpreter Lock. Since Python interpreter is allowing only one thread to be executed at the moment, switching threads every X ops, it becomes clear, that threading in Python is not useful for your task. Consider using multiprocessing only. – ElmoVanKielmo Jun 19 '13 at 08:17

2 Answers2

5

1) What am I missing here; why shouldn't a Pool be shared between processes?

Not all object/instances are pickable/serializable, in this case, pool uses threading.lock which is not pickable:

>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
[...]
  File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

or better:

>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File 
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle lock objects

If you think about it, it makes sense, a lock is a semaphore primitive managed by the operating system (since python uses native threads). Being able to pickle and save that object state inside the python runtime would really not accomplish anything meaningful since its true state is being kept by the OS.

2) What is a pattern for implementing nested parallelism in Python? If possible, maintaining a recursive structure, and not trading it for iteration

Now, for the prestige, everything I mentioned above doesn't really apply to your example since you are using threads (ThreadPoolExecutor) and not processes (ProcessPoolExecutor) so no data sharing across process has to happen.

Your java example just appears to be more efficient since the thread pool you are using (CachedThreadPool) is creating new threads as needed whereas the python executor implementations are bounded and require a explicit max thread count (max_workers). There's a little bit of syntax differences between the languages that also seems to be throwing you off (static instances in python are essentially anything not explicitly scoped) but essentially both examples would created exactly the same number of threads in order to execute. For instance, here's an example using a fairly naive CachedThreadPoolExecutor implementation in python:

from concurrent.futures import ThreadPoolExecutor

class CachedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self):
        super(CachedThreadPoolExecutor, self).__init__(max_workers=1)

    def submit(self, fn, *args, **extra):
        if self._work_queue.qsize() > 0:
            print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
            self._max_workers +=1

        return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)

pool = CachedThreadPoolExecutor()

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

print(fibonacci(10))

Performance tuning:

I strongly suggest looking into gevent since it will give you high concurrency without the thread overhead. This is not always the case but your code is actually the poster child for gevent usage. Here's an example:

import gevent

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = gevent.spawn(fibonacci, n - 1)
    b = gevent.spawn(fibonacci, n - 2)
    return a.get()  + b.get()

print(fibonacci(10))

Completely unscientific but on my computer the code above runs 9x faster than its threaded equivalent.

I hope this helps.

Rafael Ferreira
  • 1,260
  • 8
  • 11
  • 1
    gevent does not give you any parallelism. – Pi Delport Jun 17 '13 at 07:05
  • Right, no computational parallelism but the original question was not to change the fib algorithm chosen but, instead, to propose a common pattern to improve it. – Rafael Ferreira Jun 18 '13 at 06:33
  • No algorithm change is necessary: the example already splits work into independent sub-tasks. All that's needed is a substrate that actually executes the tasks in parallel (i.e., not a concurrency solution like gevent). – Pi Delport Jun 19 '13 at 13:16
  • In Java 7+ it's now possible to have nested parallelism inside a thread pool with a bounded number of threads using the [ForkJoin](http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html) task API. AFAIK there is not equivalent API in Python. – ogrisel Dec 24 '13 at 16:09
1

1. What am I missing here; why shouldn't a Pool be shared between processes?

You generally can't share OS threads between processes at all, regardless of language.

You can arrange to share access to the pool manager with worker processes, but that's probably not a good solution to any problem; see below.

2. What is a pattern for implementing nested parallelism in Python? If possible, maintaining a recursive structure, and not trading it for iteration.

This depends a lot on your data.

On CPython, the general answer is to use a data structure that implements efficient parallel operations. A good example of this is NumPy's optimized array types: here is an example of using them to split a big array operation across multiple processor cores.

The Fibonacci function implemented using blocking recursion is a particularly pessimal fit for any worker-pool-based approach, though: fib(N) will spend much of its time just tying up N workers doing nothing but waiting for other workers. There are many other ways to approach the Fibonacci function specifically, (e.g. using CPS to eliminate the blocking and fill a constant number of workers), but it's probably better to decide your strategy based on the actual problems you will be solving, rather than examples like this.

Pi Delport
  • 10,356
  • 3
  • 36
  • 50