Using python 3.4.3, I have a generator function foo
that yields data to be processed in parallel. Passing this function to multiprocessing.Pool.map
of n processes, I expected it to be called n times at a time:
from multiprocessing import Pool
import time
now = time.time
def foo(n):
for i in range(n):
print("%f get %d" % (now(), i))
yield i
def bar(i):
print("%f start %d" % (now(), i))
time.sleep(1)
print("%f end %d" % (now(), i))
pool = Pool(2)
pool.map(bar, foo(6))
pool.close()
pool.join()
Unfortunately, the generator function is called 6 times immediately. The output is this:
1440713274.290760 get 0
1440713274.290827 get 1
1440713274.290839 get 2
1440713274.290849 get 3
1440713274.290858 get 4
1440713274.290867 get 5
1440713274.291526 start 0
1440713274.291654 start 1
1440713275.292680 end 0
1440713275.292803 end 1
1440713275.293056 start 2
1440713275.293129 start 3
1440713276.294106 end 2
1440713276.294182 end 3
1440713276.294344 start 4
1440713276.294390 start 5
1440713277.294803 end 4
1440713277.294859 end 5
But I had hoped to get something more like:
1440714272.612041 get 0
1440714272.612078 get 1
1440714272.612090 start 0
1440714272.612100 start 1
1440714273.613174 end 0
1440714273.613247 end 1
1440714273.613264 get 2
1440714273.613276 get 3
1440714273.613287 start 2
1440714273.613298 start 3
1440714274.614357 end 2
1440714274.614423 end 3
1440714274.614432 get 4
1440714274.614437 get 5
1440714274.614443 start 4
1440714274.614448 start 5
1440714275.615475 end 4
1440714275.615549 end 5
(Reason is that foo is going to read a large amount of data into memory.)
I got the same results with pool.imap(bar, foo(6), 2)
and
for i in foo(6):
pool.apply_async(bar, args=(i,))
What is the easiest way to make this work?