As a follow up to this question I am trying to circumvent the list building exemplified by range(int(1e8))
using a generator xrange(int(1e8))
. Where the xrange
is just an example for a process that produces a long sequence of values. (Please assume it can not be easily reproduced.) Some more background is, I have a long list of timestamp/value pairs that I want to do some processing on (sort of time-series). I try to avoid pulling these into memory as a whole, because that's a lot of data.
I thought it would be cool, if I could apply multiple processing units simultaneously to this stream of data produced by my generator. The first idea was to use itertools.tee()
, e.g.:
from itertools import tee
g1,g2 = tee(xrange(int(1e8)),2)
sum(g1), sum(g2)
But then I found that only the first sum()
would use the generator, while tee()
internally builds a list
again (Which I wanted to avoid.).
So I thought, I'm in need for a asynchronous solution, i.e. one that would allow each sum()
do an update every generator step.
The things that came in mind where
But me having neither really used before, and partly I can not even tell whether the approaches might work, or be effective/efficient/performant.
From this point, I would gladly appreciate any suggestions from the audience!
Update
I wanted to avoid the callback based solution, as it apparantly decreases performance significantly (This is how it's currently implemented.). I have added some profiling below (please add comments if the test isn't objective):
class SinkA:
def __init__(self, src):
for i in src: pass
class SinkB:
def f(self,i):
pass
class Source:
def __iter__(self):
for i in xrange(int(1e4)):
yield i
def t1():
src = Source()
snk = SinkA(src)
def t2():
src = Source()
snk = SinkB()
for i in src: snk.f(i)
if __name__ == "__main__":
from timeit import Timer
n = 1000
t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source")
print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass
t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source")
print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass
Update 2
What more can I say? I have this callback-based solution, that appears to be inefficient. The generator-based approach appears promising, but I have too little experience with that kind of programming, especially when it comes to more sophisticated things as coroutines, or the twisted library. To sum up, I have multiple consumers for a process that generates lots of data, and I have spotted some potential approaches. Now I'm looking for qualified statements by experienced users that probably have accomplished similar tasks before. Statements that address what approach could be appropriate, how the approaches relate to each other. Or what other approaches I might have missed after all.