4

Using python 3.4.3, I have a generator function foo that yields data to be processed in parallel. Passing this function to multiprocessing.Pool.map of n processes, I expected it to be called n times at a time:

from multiprocessing import Pool
import time

now = time.time

def foo(n):
    for i in range(n):
        print("%f get %d" % (now(), i))
        yield i

def bar(i):
    print("%f start %d" % (now(), i))
    time.sleep(1)
    print("%f end %d" % (now(), i))

pool = Pool(2)
pool.map(bar, foo(6))
pool.close()
pool.join()

Unfortunately, the generator function is called 6 times immediately. The output is this:

1440713274.290760 get 0
1440713274.290827 get 1
1440713274.290839 get 2
1440713274.290849 get 3
1440713274.290858 get 4
1440713274.290867 get 5
1440713274.291526 start 0
1440713274.291654 start 1
1440713275.292680 end 0
1440713275.292803 end 1
1440713275.293056 start 2
1440713275.293129 start 3
1440713276.294106 end 2
1440713276.294182 end 3
1440713276.294344 start 4
1440713276.294390 start 5
1440713277.294803 end 4
1440713277.294859 end 5

But I had hoped to get something more like:

1440714272.612041 get 0
1440714272.612078 get 1
1440714272.612090 start 0
1440714272.612100 start 1
1440714273.613174 end 0
1440714273.613247 end 1
1440714273.613264 get 2
1440714273.613276 get 3
1440714273.613287 start 2
1440714273.613298 start 3
1440714274.614357 end 2
1440714274.614423 end 3
1440714274.614432 get 4
1440714274.614437 get 5
1440714274.614443 start 4
1440714274.614448 start 5
1440714275.615475 end 4
1440714275.615549 end 5

(Reason is that foo is going to read a large amount of data into memory.)

I got the same results with pool.imap(bar, foo(6), 2) and

for i in foo(6):
  pool.apply_async(bar, args=(i,))

What is the easiest way to make this work?

gens
  • 972
  • 11
  • 22
  • [this SO answer](http://stackoverflow.com/a/5326207/1705163) works for me, using `itertools.islice` (thanks ayeganov) – gens Sep 10 '15 at 20:47

2 Answers2

0

I had faced a similar problem, where I needed to read a large amount of data and process parts of it in parallel. I solved it by sub-classing the multiprocessing.Process and using queues. I think you will benefit from reading about embarrassingly parallel problems. I have given sample code below:

import multiprocessing
import time
import logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s  %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M:%S')

#Producer class
class foo(multiprocessing.Process):
    def __init__(self, n, queues):
        super(foo, self).__init__()
        self.n=n
        self.queues = queues

    def run(self):
        logging.info('Starting foo producer')
        for i in range(self.n):
            logging.info('foo: Sending "%d" to a consumer' % (i))
            self.queues[i%len(self.queues)].put(i)
            time.sleep(1)#Unnecessary sleep to demonstrate order of events
        for q in self.queues:
            q.put('end')
        logging.info('Ending foo producer')
        return
#Consumer class
class bar(multiprocessing.Process):
    def __init__(self, idx, queue):
        super(bar, self).__init__()
        self.idx = idx
        self.queue = queue

    def run(self):
        logging.info("Starting bar %d consumer" % (self.idx ))
        while True:
            fooput = self.queue.get()
            if type(fooput)==str and fooput=='end':
                break
            logging.info('bar %d: Got "%d" from foo' % (self.idx, fooput))
            time.sleep(2)#Unnecessary sleep to demonstrate order of events

        logging.info("Ending bar %d consumer" % (self.idx ))
        return



if __name__=='__main__':
    #make queues to put data read by foo
    count_queues = 2
    queues =[]
    for i in range(count_queues):
        q = multiprocessing.Queue(2)
        # Give queue size according to your buffer requirements
        queues.append(q)

    #make reader for reading data. lets call this object Producer
    foo_object = foo(6, queues)

    #make receivers for the data. Lets call these Consumers
    #Each consumer is assigned a queue
    bar_objects = []
    for idx, q in enumerate(queues):
        bar_object = bar(idx, q)
        bar_objects.append(bar_object)

    # start the consumer processes
    for bar_object in bar_objects:
        bar_object.start()


    # start the producer processes
    foo_object.start()


    #Join all started processes
    for bar_object in bar_objects:
        bar_object.join()

    foo_object.join()
gnub
  • 193
  • 2
  • 11
0

The best I can come up with myself is this:

pool_size = 2
pool = Pool(pool_size)

count = 0
for i in foo(6):
    count += 1
    if count % pool_size == 0:
        pool.apply(bar, args=(i,))
    else:
        pool.apply_async(bar, args=(i,))

pool.close()
pool.join()

for pool_size=2 it outputs:

1440798963.389791 get 0
1440798963.490108 get 1
1440798963.490683 start 0
1440798963.595587 start 1
1440798964.491828 end 0
1440798964.596687 end 1
1440798964.597137 get 2
1440798964.697373 get 3
1440798964.697629 start 2
1440798964.798024 start 3
1440798965.698719 end 2
1440798965.799108 end 3
1440798965.799419 get 4
1440798965.899689 get 5
1440798965.899984 start 4
1440798966.001016 start 5
1440798966.901050 end 4
1440798967.002097 end 5

for pool_size=3 it outputs:

1440799101.917546 get 0
1440799102.018438 start 0
1440799102.017869 get 1
1440799102.118868 get 2
1440799102.119903 start 1
1440799102.219616 start 2
1440799103.019600 end 0
1440799103.121066 end 1
1440799103.220746 end 2
1440799103.221124 get 3
1440799103.321402 get 4
1440799103.321664 start 3
1440799103.422589 get 5
1440799103.422824 start 4
1440799103.523286 start 5
1440799104.322934 end 3
1440799104.423878 end 4
1440799104.524350 end 5

However, it would take 3 new items from the iterator as soon as the apply finishes. If the processing takes variable time, this won't work as well.

gens
  • 972
  • 11
  • 22