3

I am sometimes building stuff similar to computational graphs using "simple" Python generators or generator comprehensions, for example:

# example 1

w1 = lambda v: v ** 2  # placeholder for expensive operation
w2 = lambda v: v - 3  # placeholder for expensive operation
w3 = lambda v: v / 7  # placeholder for expensive operation

d = [10, 11, 12, 13]  # input data, could be "large"

r1 = (w1(x) for x in d)  # generator for intermediary result 1
r2 = (w2(x) for x in r1)  # generator for intermediary result 2

r3 = [w3(x) for x in r2]  # final result
print(r3)

Imagine the list d being really large and filled with bigger stuff than integers. r1 and r2 are chained-up generators, saving a ton of memory. My lambdas are simple placeholders for expensive computations / processing steps that yield new, independent, intermediate results.

The cool thing about this approach is that one generator can depend on multiple other generators, e.g. the zip function, which technically allows to "merge/join branches" of a graph:

# example 2

wa1 = lambda v: v ** 2  # placeholder for expensive operation
wb1 = lambda v: v ** 3  # placeholder for expensive operation
wm = lambda a, b: a + b  # placeholder for expensive operation (MERGE)
w2 = lambda v: v - 3  # placeholder for expensive operation
w3 = lambda v: v / 7  # placeholder for expensive operation

da = [10, 11, 12, 13]  # input data "a", could be "large"
db = [20, 21, 22, 23]  # input data "b", could be "large"

ra1 = (wa1(x) for x in da)  # generator for intermediary result 1a
rb1 = (wb1(x) for x in db)  # generator for intermediary result 1b
rm = (wm(x, y) for x, y in zip(ra1, rb1))  # generator for intermediary result rm -> MERGE of "a" and "b"
r2 = (w2(x) for x in rm)  # generator for intermediary result 2

r3 = [w3(x) for x in r2]  # final result
print(r3)

Two data sources, da and db. Their intermediate results get "merged" in rm though the actual computation is really only triggered by computing r3. Everything above is generators, computed on demand.

Something that I have been pondering about for a while is how to reverse this, i.e. how to "split into branches" using generators - without having to keep all intermediate results of one step in memory simultaneously. Consider the following example:

# example 3

w1 = lambda v: v ** 2  # placeholder for expensive operation
ws = lambda v: (v - 1, v + 1)  # placeholder for expensive operation (SPLIT)
w2 = lambda v: v - 3  # placeholder for expensive operation
w3 = lambda v: v / 7  # placeholder for expensive operation

d = [10, 11, 12, 13]  # input data, could be "large"

r1 = (w1(x) for x in d)  # generator for intermediary result 1
rs = [ws(x) for x in r1]  # ???
ra2 = (w2(x) for x, _ in rs)  # generator for intermediary result 2
rb2 = (w2(x) for _, x in rs)  # generator for intermediary result 2

ra3 = [w3(x) for x in ra2]  # final result "a"
rb3 = [w3(x) for x in rb2]  # final result "b"
print(ra3, rb3)

The results of generator r1 are required by two different operations, as depicted in lambda ws, which also handles the "split into branches".

My question is: Can I substitute rs, currently a list comprehension, by something that behaves like a generator, computes every intermediate result only once but makes it available to multiple generators, e.g. ra2 and rb2, "on demand"? If I had to keep some intermediate results, i.e. elements of rs cached in memory at any given time, I'd be fine - just not all of rs as e.g. a list.

Since the branches in example 3 are kind of symmetrical, I could get away with this:

# example 4

w1 = lambda v: v ** 2  # placeholder for expensive operation
ws = lambda v: (v - 1, v + 1)  # placeholder for expensive operation (SPLIT)
w2 = lambda v: v - 3  # placeholder for expensive operation
w3 = lambda v: v / 7  # placeholder for expensive operation

d = [10, 11, 12, 13]  # input data, could be "large"

r1 = (w1(x) for x in d)  # generator for intermediary result 1
rs = (ws(x) for x in r1)  # ???
r2 = ((w2(x), w2(y)) for x, y in rs)  # generator for intermediary result 2

r3 = [(w3(x), w3(y)) for x, y in r2]  # final result
print(r3)

For more complicated processing pipelines this could however become quite cluttered and impractical. For the purpose of this question, let's assume that I really want to separate between intermediate results 2 for branches "a" and "b".

My best bad idea so far was to work with threads and queues since all of this also implicitly raises the question of execution order. In example 3, ra3 will be evaluated completely before rb3 gets even touched, implicating that all intermediate results of rs must be kept around until rb3 can be evaluated. Realistically, ra3 and rb3 must therefore be evaluated in parallel or alternating if I do not want to keep all of rs in memory at the same time. I am wondering if there is a better, more clever way to get this done - it smells a lot like some async magic could make sense here.

s-m-e
  • 3,433
  • 2
  • 34
  • 71

1 Answers1

2

you could use itertools.tee

Note, this does the caching you alluded to.So, if you evaluate one branch entirely before the other, you gain nothing, since the itertools.tee object has to buffer the already seen elements in memory anyway.

Also, important to note from the docs:

Once a tee() has been created, the original iterable should not be used anywhere else; otherwise, the iterable could get advanced without the tee objects being informed.

Note, the docs I linked to provide a sample implementation that approximates the actual implementation (which is in C). You'll notice, it basically uses a bunch of queues.

So, more concretely:

import itertools

w1 = lambda v: v ** 2  # placeholder for expensive operation
ws = lambda v: (v - 1, v + 1)  # placeholder for expensive operation (SPLIT)
w2 = lambda v: v - 3  # placeholder for expensive operation
w3 = lambda v: v / 7  # placeholder for expensive operation

d = [10, 11, 12, 13]  # input data, could be "large"

r1 = (w1(x) for x in d)  # generator for intermediary result 1
rs1, rs2 = itertools.tee(ws(x) for x in r1)  # second argument defaults to 2
ra2 = (w2(x) for x, _ in rs1)  # generator for intermediary result 2
rb2 = (w2(x) for _, x in rs2)  # generator for intermediary result 2

for x, y in zip(map(w3, ra2), map(w3, rb2)):
    print(x, y)
juanpa.arrivillaga
  • 88,713
  • 10
  • 131
  • 172
  • It means that I have to manually take care of the alternating execution order of the rest of the code. That's an option, thanks. Note from the docs: "`tee` iterators are not threadsafe." – s-m-e Apr 07 '23 at 20:20
  • 2
    @s-m-e: you also need to know that `itertools.tee` [doesn't work](https://stackoverflow.com/q/53785260/5997596) with nested iterables (iterables of iterables and so on) – Azat Ibrakov Apr 07 '23 at 20:33
  • @AzatIbrakov Can you elaborate on what this means for my example (of generator comprehensions)? – s-m-e Apr 07 '23 at 20:38
  • 2
    @s-m-e imagine something like `(map(f, xs) for f, xs in zip(fs, xss))`. The inner `map` iterators will be empty if you try to go over them more than once. – juanpa.arrivillaga Apr 07 '23 at 20:39
  • @AzatIbrakov a great question/answer you linked to, btw – juanpa.arrivillaga Apr 07 '23 at 20:40