0

I am training a neural network with a large text corpora. Each text generate quite a big matrix because I'm using a convolutional model. As my data won't feet in my still large memory, I try to stream it, and use keras.models fit_generator.

To feed keras, I have a pipeline composed of different preprocessing steps, that I arrange with a dask bag with lots of partitions. The dask bag reads a file on disk.

Even is dask is not handling iteration in a smart way (it just compute() and iter on result, which in my case blow up memory), I was to use something like this:

def compute_partition_iter(collection, **kwargs):
    """A utility to compute a collection items after items
    """
    get = kwargs.pop("get", None) or _globals['get']
    if get is None:
        get = collection.__dask_scheduler__
    postcompute_func, postcompute_args = collection.__dask_postcompute__()
    dsk = collection.__dask_graph__()
    for key in collection.__dask_keys__():
        yield from f([partition], *args)

This compute partitions one by one and return items, computing next partition as we cross partition border.

This approach has a problem : it's only when we hit last item from partition that we provoque the computation of next elements, leading to a lag until next element. Within this lag, keras is stalled and we loose precious time !

So I imagine running the above compute_partition_iter in a separate process thanks to multiprocessing.Pool, feeding partitions in a Queue with say 2 slots, so that in the generator, I won't always have one more partition ready.

But it seems that this is not supported by dask.bag. I didn't dive deeply enough in the code, but it seems like there are some async methods used, or I don't know what.

Here is a reproductible code for the problem.

First a code that work, using a simple range.

import multiprocessing
import time


def put_q(n, q):
    for i in range(n):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

This outputs

0 <-
1 <-
2 <-
zzz
3 <-
->  0
zzz
->  1
zzz
->  2
zzz
->  3
zzz

you can see that, as expected, elements where computed in anticipation and it's all ok.

Now let's replace the range by a dask.bag:

import multiprocessing
import time

import dask.bag


def put_q(n, q):
    for i in dask.bag.from_sequence(range(n), npartitions=2):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

In a jupyter notebook, it indefinitely raises :

Process ForkPoolWorker-71:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
    initializer(*initargs)
  File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
    for i in dask.bag.from_sequence(range(n), npartitions=2):
  File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
    return iter(self.compute())
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
    results = get(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
    initializer=initialize_worker_process)
  File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
    self._repopulate_pool()
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
    w.start()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

while the main process is stalled, waiting for elements in queue.

I also tried using a ipyparallel cluster but in this case the main process is simply stalled (no trace of the exception).

Does anyone knows the right way to do that ?

Is there a way I can run scheduler.get in parallel to my main code ?

alex garel
  • 26
  • 4

1 Answers1

0

Finally I should have got a closer look at the exception !

Stackoverflow gave me the solution : Python Process Pool non-daemonic?

In fact as the bag scheduler uses Pool, it can't be called inside a process spawned by pool. The solution in my case is to simply use threads. (Note that the bug and its solution depends on the scheduler you use).

So I have substituted multiprocessing.Pool for multiprocessing.pool.ThreadPool and it works like a charm, either in a normal notebook, or when using ipyparallel.

So it goes like this:

import queue
from multiprocessing.pool import ThreadPool
import time

import dask.bag


def put_q(n, q):
    b = dask.bag.from_sequence(range(n), npartitions=3)
    for i in b:
        print(i, "<-")
        q.put(i)
    q.put(None)

q = queue.Queue(2)
with ThreadPool(1, put_q, (6, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

Which outputs:

zzz
0 <-
1 <-
2 <-
->  0
zzz
3 <-
->  1
zzz
4 <-
-> 5 <-
 2
zzz
->  3
zzz
->  4
zzz
->  5
zzz
alex garel
  • 26
  • 4