4

Having build a significant part of my code on dill serialization/pickling, I'm also trying to use pathos multiprocessing to parallelize my calculations. Pathos it is a natural extension of dill.

When trying to run nested

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

inside an other ProcessingPool().map, then I receive:

AssertionError: daemonic processes are not allowed to have children

E.g.:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

yields

AssertionError: daemonic processes are not allowed to have children

I tried using amap(...).get() without success. This is on pathos 0.2.0.

What is the best way to allow for nested parallelization?

Update

I have to be honest at this point, and confess that I have removed the assertion "daemonic processes are not allowed to have children" from pathos. I also built something which cascades KeyboardInterrupt to workers and workers of those... Parts of the solution below:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

Seems to work from console and IPython notebook (with stop button), but not sure it's 100% correct in all corner cases.

Mark Horvath
  • 1,136
  • 1
  • 9
  • 24
  • I'm the `pathos` author. The reason you can't have processes spawning processes is that they don't die appropriately, and you have zombie processes that will eventually hang. I would recommend the solution of @Yoda, as this is the typical case… one "expensive" parallel block and several "lightweight" parallel bits of work. `pathos` also has the `ParallelPool`, which is slower, but works if you need something other than threads. I'd also suggest experimenting with non-blocking maps, as blocking can slow you down. Also see: http://stackoverflow.com/questions/28203774 – Mike McKerns Nov 28 '16 at 20:07
  • @MikeMcKerns, I started to experiment with the code on many ways (including non-daemon processes) and ended up with the above working. Also included `amap` but for other reason, `Ctrl+C` didn't get me out of `map`. Unfortunately can't use the "lightweight" trick, as this was already a bigger system at the time of finding pathos (after dill). Now the next challenge is to have some sort of shared memory (read write all processes), which seems to be difficult using my cascading solution... Great tool btw, thanks! – Mark Horvath Nov 29 '16 at 14:17
  • I can't imagine what kind of workflow you'd have where you can't use one of the other pools (`ThreadingPool` or `ParallelPool`) to provide nested parallelism, and would require a hierarchy of `ProcessingPools`… but maybe you have a valid use case I haven't thought of, and I wouldn't mind knowing more about it (maybe as a ticket on the `pathos` github page). Yes, by removing the assertion, nested `ProcessingPools` should work. However, the reason the assertion is there is the nested spawned pools tend to live on as zombies. Killing the zombie processes using their job id is a workaround. – Mike McKerns Nov 29 '16 at 15:51
  • Just understanding your original suggestion, sorry. `ParallelPool` actually looks perfect! Right now the code can just fork new processes anywhere where it needs (after checking if there are enough resources). I could build a dispatcher as a socket based server, which would accept pickled jobs for execution. Not impossible at all, just need some refactoring. Thanks! – Mark Horvath Nov 29 '16 at 23:19
  • 1
    Ok, that's great. You should answer your own question if you feel that you've found a better answer than what has been presented thus far. – Mike McKerns Nov 30 '16 at 06:19

1 Answers1

4

I encountered exactly the same issue. In my case, The inner operation was the one that needed parallelism so I did a ThreadingPool of a ProcessingPool. Here it is with your example:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

You can even have another layer with another outer threading pool. Depending on your case, you can invert the order of these pools. However, you cannot have processes of processes. If really needed, see: https://stackoverflow.com/a/8963618/6522112. I haven't try it yet myself so I can't elaborate on this.

Community
  • 1
  • 1
tupui
  • 5,738
  • 3
  • 31
  • 52
  • Makes sense, unfortunately in my case I can't say which level will calculate heavily in advance, nor can I limit to 2 levels of parallelization easily. – Mark Horvath Nov 29 '16 at 14:14
  • The solution I gave seem to work but sometimes the outer pool seems to hang forever. I tried `imap` and `amap` with no luck. Maybe @MikeMcKerns can enlighten this? Does `ParallelPool` could help? – tupui Nov 29 '16 at 15:53
  • Essentially, python's `multiprocessing.Pool` does not kill cleanly when it's parent has been killed. `multiprocess` and thus `pathos` has the same issue because they reuse the same code. `pathos.pools.ParallelPool` doesn't fork from `multiprocessing` (it's from `pp` instead), thus it doesn't suffer the same issue… however, the serialization is weaker (it's done by "source extraction" instead of "pickling", and it doesn't allow shared objects). – Mike McKerns Nov 30 '16 at 06:11
  • FYI: source extraction is `dill.source`, while normal pickling is `dill`. – Mike McKerns Nov 30 '16 at 06:13
  • What about doing a `pool.terminate()` and `pool.join()` ? My understanding was that it would kill the child properly. I saw a `timeout` option but no option to detect if there is job left. If there is one, one could do – tupui Nov 30 '16 at 09:02
  • Don't know how to edit comment... So if there is an option to detect if there is job left (haven't seen it), one could trigger the termination of the pool manually. But are we getting the results if doing that? Also, I think that the issue I mentioned @MikeMcKerns was due to an `imap` within an `imap`. Any insight? I will look at `pathos.pools.ParallelPool`. – tupui Nov 30 '16 at 09:10
  • @Y0da: `imap` and `map` use the same code under the hood. Basically, `map` is just an `imap` that dumps to a list. The asynchronous map (`amap`) uses a different underlying object, and thus might have different behavior than `map` and `imap`. Note that a results object from `amap` has a `get`, and should have a `ready` method to check if all jobs are finished. – Mike McKerns Nov 30 '16 at 13:32
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/129456/discussion-between-y0da-and-mike-mckerns). – tupui Nov 30 '16 at 15:20