2

As a follow up to this question I am trying to circumvent the list building exemplified by range(int(1e8)) using a generator xrange(int(1e8)). Where the xrange is just an example for a process that produces a long sequence of values. (Please assume it can not be easily reproduced.) Some more background is, I have a long list of timestamp/value pairs that I want to do some processing on (sort of time-series). I try to avoid pulling these into memory as a whole, because that's a lot of data.

I thought it would be cool, if I could apply multiple processing units simultaneously to this stream of data produced by my generator. The first idea was to use itertools.tee(), e.g.:

from itertools import tee
g1,g2 = tee(xrange(int(1e8)),2)
sum(g1), sum(g2)

But then I found that only the first sum() would use the generator, while tee() internally builds a list again (Which I wanted to avoid.).

So I thought, I'm in need for a asynchronous solution, i.e. one that would allow each sum() do an update every generator step. The things that came in mind where

But me having neither really used before, and partly I can not even tell whether the approaches might work, or be effective/efficient/performant.

From this point, I would gladly appreciate any suggestions from the audience!


Update

I wanted to avoid the callback based solution, as it apparantly decreases performance significantly (This is how it's currently implemented.). I have added some profiling below (please add comments if the test isn't objective):

class SinkA:
  def __init__(self, src):
    for i in src: pass

class SinkB:
  def f(self,i):
    pass

class Source:
  def __iter__(self):
    for i in xrange(int(1e4)):
      yield i

def t1():
  src = Source()
  snk = SinkA(src)

def t2():
  src = Source()
  snk = SinkB()
  for i in src: snk.f(i)

if __name__ == "__main__":
    from timeit import Timer
    n = 1000
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass

Update 2

What more can I say? I have this callback-based solution, that appears to be inefficient. The generator-based approach appears promising, but I have too little experience with that kind of programming, especially when it comes to more sophisticated things as coroutines, or the twisted library. To sum up, I have multiple consumers for a process that generates lots of data, and I have spotted some potential approaches. Now I'm looking for qualified statements by experienced users that probably have accomplished similar tasks before. Statements that address what approach could be appropriate, how the approaches relate to each other. Or what other approaches I might have missed after all.

Community
  • 1
  • 1
moooeeeep
  • 31,622
  • 22
  • 98
  • 187
  • 1
    You don't really address this: do you want each consumer to see the exact same data, or not? – Marcin Mar 19 '12 at 12:17
  • I guess the behavior you see with `tee` is because you don't run your two tasks in parallel. Python first execute `sum(g1)`, then `sum(g2)`. Try doing your sum manually using a loop, and see if it consumes as mush memory. – Charles Brunet Mar 19 '12 at 12:25
  • @CharlesBrunet, that's true. I am trying to abstract away this manual loop somehow. To have nicer code. – moooeeeep Mar 19 '12 at 12:37
  • @Marcin, yes, every consumer should get the same input data. – moooeeeep Mar 19 '12 at 12:38
  • @moooeeeep In what way do you want your code to become "nicer"? – Marcin Mar 19 '12 at 12:39
  • @moooeeeep what's the nature of the consumers: are they in separate threads or called consequently? – bereal Mar 19 '12 at 12:43
  • @moooeeeep "Modularity" doesn't really mean anything. Be specific. Right now, this code seems modular, and your question as such appears to have been answered. – Marcin Mar 19 '12 at 12:43
  • @Marcin, see my edit. Modularity is just the argument why I'm not willing to implement all functionality into a single loop... – moooeeeep Mar 19 '12 at 13:33
  • @moooeeeep I don't see how a single loop breaks modularity. You have a solution that doesn't seem unduly slow. – Marcin Mar 19 '12 at 13:39
  • @moooeeeep Even though I updated my answer with coroutines option (just for variety), it seems that I've found a bottleneck in your callback solution `t2`: you don't need to make `Source` a generator, pull from it and call `SinkB.f`, that's a waste. Just pass the `SinkB.f` as a parameter and call it from there, that would make the difference. – bereal Mar 21 '12 at 20:57

4 Answers4

6

As a generic approach, I would replace the generator's pull model with callbacks, and, probably, wrap the generator, like this:

def walk(gen, callbacks):
    for item in gen:
        for f in callbacks:
            f(item)

If your processors are in separate threads and you want them to block on waiting, you can register Queue.put (or anything equivalent) as a callback for each processor, and poll those queues independently. This will allow you to use both broadcasting and worker-pool models if you need so.

Edit

Another solution would be to use coroutines:

def source(self, *dests):
    for i in xrange(int(1e4)):
        for dest in dests:
            dest.send(i)

def sink():
    while True:
        i = yield

def t3():
    snk = sink()
    snk.next() # activate the coroutine
    source(snk)

if __name__ == '__main__':

    from timeit import Timer
    n = 1000
    t = Timer("t3()", "from __main__ import source, sink, t3")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass

Looks fast enough. Basically, coroutines are the inverted generators, you pull from generator, push to coroutine.

bereal
  • 32,519
  • 6
  • 58
  • 104
  • What's the architecture surrounding this? Given that the pull model would cause threads to block, as long as the IO layer acts appropriately, that would be the simplest model to programme. – Marcin Mar 19 '12 at 12:28
  • If the man needs to block, well, callbacks may be `Queue.put` in fact (updated the answer). – bereal Mar 19 '12 at 12:31
  • the execution speeds of the coroutine and the callback approach are pretty similar for me, and these are apparantly the only approaches (except for multiprocessing) that are able to enable multiple processing units. Thanks for your post. – moooeeeep Mar 26 '12 at 14:21
1

You don't really address this, but do you want each consumer to see the exact same data (in which case tee is probably the best solution), or not?

If not, then you can simply have each consumer read from the one generator object.

If you do want them to get the exact same data, try tee (uses more memory) vs two generators (more IO), and see which is faster.

As to your timings, what your data show is simply that there is an overhead to multiple function calls, and that one of your methods avoids intermediate function calls.

If you want to improve performance, try running this on PyPy, which has a hotspot-optimising JIT.

Marcin
  • 48,559
  • 18
  • 128
  • 201
  • Unfortunately PyPy does not support my other dependencies. (Indeed, there wasn't a note in the question.) – moooeeeep Mar 19 '12 at 13:39
  • @moooeeeep You seem to have a lot of constraints that you simply don't state. All of these answers answer your question as asked, but then you keep complaining that they don't answer your question. – Marcin Mar 19 '12 at 13:44
1

Since generators are cheap in memory, why don't you simply use two independent generators?

g1 = xrange(int(1e8))
g2 = xrange(int(1e8))
sum(g1), sum(g2)
Charles Brunet
  • 21,797
  • 24
  • 83
  • 124
  • 1
    The `xrange` is just an example for a process that produces a long list of values. Please assume it can not be easily reproduced. – moooeeeep Mar 19 '12 at 12:20
-1

I suggest you look up how to implement this with coroutines, more specifically this broadcast example

bpgergo
  • 15,669
  • 5
  • 44
  • 68