3

Right now I have some code that does roughly the following

def generator():

    while True:
        value = do_some_lengthy_IO()
        yield value 

def model():

    for datapoint in generator():
        do_some_lengthy_computation(datapoint)

Right now, the I/O and the computation happen in serial. Ideally the should be running in parallel concurrently (the generator having ready the next value) since they share nothing but the value being passed. I started looking into this and got very confused with the multiprocessing, threading, and async stuff and could not get a minimal working example going. Also, since some of this seems to be recent features, I am using Python 3.6.

  • 1
    If you're on python3.5+, you can look at asyncIO. For older versions, there's multiprocessing with queues. – cs95 Mar 09 '18 at 02:57
  • With all due respect, **this is a classical "*just*"-`[CONCURRENT]` process** scheduling, by far **not a true-`[PARALLEL]` processing** system, kindly review the post >>> https://stackoverflow.com/revisions/8337936/4 and may wish to revise / edit the declared problem formulation so as to better meet the Computer Science & Complex Systems & Process Scheduling terminology. – user3666197 Mar 09 '18 at 03:10
  • Give us a simple, self-contained, complete example instead of a vague pseudo-example. As asked, this question is too broad to reasonably answer in this format. – Jean-Paul Calderone Mar 09 '18 at 14:18

1 Answers1

4

I ended up figuring it out. The simplest way is to use the multiprocessing package and use a pipe to communicate with the child process. I wrote a wrapper that can take any generator

import time
import multiprocessing

def bg(gen):
    def _bg_gen(gen, conn):
        while conn.recv():
            try:
                conn.send(next(gen))
            except StopIteration:
                conn.send(StopIteration)
                return

    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=_bg_gen, args=(gen, child_conn))
    p.start()

    parent_conn.send(True)
    while True:
        parent_conn.send(True)
        x = parent_conn.recv()
        if x is StopIteration:
            return
        else:
            yield x

def generator(n):
    for i in range(n):
        time.sleep(1)
        yield i

#This takes 2s/iteration
for i in generator(100):
    time.sleep(1)

#This takes 1s/iteration
for i in bg(generator(100)):
    time.sleep(1)

The only missing thing right now is that for infinite generators the process is never killed but that can be easily added by doing a parent_conn.send(False).