19

I am trying to reproduce the reactive extensions "shared" observable concept with Python generators.

Say I have an API that gives me an infinite stream that I can use like this:

def my_generator():
    for elem in the_infinite_stream():
        yield elem

I could use this generator multiple times like so:

stream1 = my_generator()
stream2 = my_generator()

And the_infinite_stream() will be called twice (once for each generator).

Now say that the_infinite_stream() is an expensive operation. Is there a way to "share" the generator between multiple clients? It seems like tee would do that, but I have to know in advance how many independent generators I want.

The idea is that in other languages (Java, Swift) using the reactive extensions (RxJava, RxSwift) "shared" streams, I can conveniently duplicate the stream on the client side. I am wondering how to do that in Python.

Note: I am using asyncio

Reblochon Masque
  • 35,405
  • 10
  • 55
  • 80
JonasVautherin
  • 7,297
  • 6
  • 49
  • 95
  • By "share", do you mean to cache the values for use in the next generator? – Quelklef Apr 19 '19 at 12:54
  • Also, are these "real" streams (with .seek, .close, etc), or just general iterables? And would `stream1`, `stream2`, etc. be used in parallel, or only one-after-the-other? – Quelklef Apr 19 '19 at 12:58
  • You need to store the whole stream between the first and the last consumer - which becomes apparent if there is one consumer polling the infinite stream all the time, and one consumer lagging. And if you want any new consumer to start at the beginning, you need to cache it all. If that's okay with you, it's easy to implement. – liborm Apr 19 '19 at 12:59
  • If you don't want to replay the whole history to the new consumer, you're rather looking for some pub-sub scheme. – liborm Apr 19 '19 at 13:03
  • By "share", I mean the pub-sub scheme suggested by liborm: if an event A arrives from `infinite_stream()`, I want all the subscribers to receive it. I don't want to replay the whole history, but just the last value (say `infinite_stream()` emits once per minute, when a subscriber joins, he doesn't want to wait 1 minute to receive the first value). – JonasVautherin Apr 19 '19 at 13:14
  • This is really "built in" reactive extensions, and I am wondering if such a mechanism exists in python (possibly in asyncio). Implementing it myself does not sound super straightforward and that's prone to error, therefore I'd rather use something standard (just like I use RxJava or RxSwift) =). – JonasVautherin Apr 19 '19 at 13:16

5 Answers5

8

I took tee implementation and modified it such you can have various number of generators from infinite_stream:

import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1

To cache only some amount of values you can change already_gone = [] into already_gone = collections.deque(maxlen=size) and add size=None parameter to generators_factory.

sanyassh
  • 8,100
  • 13
  • 36
  • 70
  • That's interesting. And it could be extended such that `already_gone` has a `max_size` (say, 1), so that it caches only one value. Correct? – JonasVautherin Apr 19 '19 at 13:26
  • Does it mean that if `gen1` and `gen2` are used in parallel, the order may not be respected? Mainly out of curiosity... I am trying (currently failing) to port that to asyncio and it may be different there. – JonasVautherin Apr 24 '19 at 14:30
  • Order of produced elements? Nor for tee neither for my implementation. – sanyassh Apr 24 '19 at 16:36
  • Maybe it is better to ask new question about your asyncio failed attempts with a reference to this one. – sanyassh Apr 24 '19 at 16:39
  • Good to know for tee! I got something working with asyncio, I still need to figure out how to remove consumers that would cancel their subscription. It may end up on codereview once I get it to work! – JonasVautherin Apr 24 '19 at 22:14
3

Consider simple class attributes.

Given

def infinite_stream():
    """Yield a number from a (semi-)infinite iterator."""
    # Alternatively, `yield from itertools.count()`
    yield from iter(range(100000000))


# Helper
def get_data(iterable):
    """Print the state of `data` per stream."""
    return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])

Code

class SharedIterator:
    """Share the state of an iterator with subclasses."""
    _gen = infinite_stream()
    data = None

    @staticmethod
    def modify():
        """Advance the shared iterator + assign new data."""
        cls = SharedIterator
        cls.data = next(cls._gen)

Demo

Given a tuple of client streams (A, B and C),

# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass


streams = A, B, C

let us modify and print the state of one iterator shared between them:

# Observe changed state in subclasses    
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))

Output

1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2

Although any stream can modify the iterator, the class attribute is shared between sub-classes.

See Also

  • Docs on asyncio.Queue - an async alternative to shared container
  • Post on the Observer Pattern + asyncio
pylang
  • 40,867
  • 14
  • 129
  • 121
  • But then it seems to me that when A consumes (`modify()`) an event, it is consumed for B and C at the same time. So if I do `next(A); next(B); next(C);` they don't all get the same event, do they? – JonasVautherin Apr 23 '19 at 20:16
  • I believe that Sanyash answer does exactly what I want. But your solution doesn't behave the same way, does it? Maybe I miss something there =/. – JonasVautherin Apr 23 '19 at 20:18
  • 1
    That's correct according to your comment "if an event A arrives from `infinite_stream()`, I want all the subscribers to receive it." This may be another interpretation of what you actually want. – pylang Apr 23 '19 at 20:48
3

You can call "tee" repeatedly to create multiple iterators as needed.

it  = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)

Sample: http://tpcg.io/ZGc6l5.

tarkmeper
  • 767
  • 3
  • 12
  • Actually, you don't need to regenerate base. `_, it_cp2 = itertools.tee(base)` works just as well after the first time it is created. – tarkmeper Apr 23 '19 at 04:22
  • 1
    I should have and have edited it now. Last night I couldn't figure out why this worked and thought I might need to delete the comment. This morning I took a look at the C code - and when you call tee with N > 1 I think this is effectively what it is doing. – tarkmeper Apr 23 '19 at 11:26
  • Interesting, I didn't realize I could call `tee` repeatedly. However, that is caching all the events of the infinite stream, isn't it? I haven't found a way to set a `max_size` in `tee` (which is possibly why @Sanyash reimplemented it?). – JonasVautherin Apr 23 '19 at 20:03
  • 1
    Based on the C code I think it caches only the items where at least 1 iterator still needs the data. So yes in the answer above it will cache all of them. On the other hand if instead of basing them all on `base` you tee off an iterator that has already been moved it will cache from that point onwards, but doesn't have the flexibility of @Sanyash to solution look backwards an X (or unlimited) element cache. – tarkmeper Apr 24 '19 at 07:47
3

You can use single generator and "subscriber generators":

subscribed_generators = []


def my_generator():
    while true:
        elem = yield
        do_something(elem) # or yield do_something(elem) depending on your actual use

def publishing_generator():
    for elem in the_infinite_stream():
        for generator in subscribed_generators:
            generator.send(elem)

subscribed_generators.extend([my_generator(), my_generator()])

# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
    pass

Instead of generator-function you may also create a class with methods: __next__, __iter__, send, throw. That way you can modify MyGenerator.__init__ method to automatically add new instances of it to subscribed_generators.

This is somewhat similar to event-based approach with a "dumb implementation":

  • for elem in the_infinite_stream is similar to emitting event
  • for generator ...: generator.send is similar to sending event to each subscriber.

So one way to implement a "more complex but structured solution" would be to use event-based approach:

  • For example you can use asyncio.Event
  • Or some third-party solution like aiopubsub
  • For any of those approaches you should emit event for each element from the_infinite_stream, and your instances of my_generator should be subscribed to those events.

And other approaches can also be used and the best choice depends: on details of your task, on how are you using event-loop in asyncio. For example:

  • You can implement the_infinite_stream (or wrapper for it) as some class with "cursors" (objects that track current position in the stream for different subscribers); then each my_generator registers new cursor and uses it to get next item in the infinite stream. In this approach event-loop will not automatically revisit my_generator instances, which might be required if those instances "are not equal" (for example have some "priority balancing")

  • Intermediate generator calling all the instances of my_generator (as described earlier). In this approach each instance of my_generator is automatically revisited by event-loop. Most likely this approach is thread-safe.

  • Event-based approaches:

    • using asyncio.Event. Similar to use of intermediate generator. Not thread-safe

    • aiopubsub.

    • something that uses Observer pattern

  • Make the_infinite_generator (or wrapper for it) to be "Singleton" that "caches" latest event. Some approaches were described in other answers. Another "caching" solutions can be used:

    • emit the same element once for each instance of the_infinite_generator (use class with custom __new__ method that tracks instances, or use the same instance of class that has a method returning "shifted" iterator over the_infinite_loop) until someone calls special method on instance of the_infinite_generator (or on class): infinite_gen.next_cycle. In this case there should always be some "last finalizing generator/processor" that at the end of each event-loop's cycle will do the_infinite_generator().next_cycle()

    • Similar to previous but same event is allowed to fire multiple times in the same my_generator instance (so they should watch for this case). In this approach the_infinite_generator().next_cycle() can be called "periodically" with loop.call_later or loop.cal_at. This approach might be needed if "subscribers" should be able to handle/analyze: delays, rate-limits, timeouts between events, etc.

  • Many other solutions are possible. It's hard to propose something specific without looking at your current implementation and without knowing what is the desired behavior of generators that use the_infinite_loop

If I understand your description of "shared" streams correctly, that you really need "one" the_infinite_stream generator and a "handler" for it. Example that tries to do this:


class StreamHandler:
    def __init__(self):
        self.__real_stream = the_infinite_stream()
        self.__sub_streams = []

    def get_stream(self):
        sub_stream = []  # or better use some Queue/deque object. Using list just to show base principle
        self.__sub_streams.append(sub_stream)
        while True:
            while sub_stream:
                yield sub_stream.pop(0)
            next(self)

    def __next__(self):
        next_item = next(self.__real_stream)
        for sub_stream in self.__sub_steams:
            sub_stream.append(next_item)

some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop

def my_generator():
    for elem in some_global_variable.get_stream():
        yield elem

But if all your my_generator objects are initialized at the same point of infinite stream, and "equally" iterated inside the loop, then this approach will introduce "unnecessary" memory overhead for each "sub_stream" (used as queue). Unnecessary: because those queues will always be the same (but that can be optimized: if there are some existing "empty" sub_stream than it can be re-used for new sub_streams with some changes to "pop-logic"). And many-many other implementations and nuances can be discussed

imposeren
  • 4,142
  • 1
  • 19
  • 27
  • I see. So a consumer of that stream can implement its own `my_generator()` and has to append it manually to `subscribed_generators`, right? But then I don't get what the last `for` does :/ – JonasVautherin Apr 23 '19 at 19:58
  • 1
    @JonesV: You always have some "upper code" that creates those `my_generator()` calls: you can append it there. You are not restricted to to some global variable of `subscribed_generators`, the code just "explains a concept", and many other approaches are possible. See my answer: I've updated it with more details and with a list of some other potential approaches. – imposeren Apr 25 '19 at 12:01
  • Thanks a lot for the elaboration! I went for Sanyash implementation in my code, that I could quite easily adapt for asyncio, hence I validate that answer. However, the thoughts here are super nice, and I did not know `aiopubsub`, which seems very nice! – JonasVautherin Apr 25 '19 at 21:01
0

If you have a single generator, you can use one queue per "subscriber" and route events to each subscriber as the primary generator produces results.

This has the advantage of allowing the subscribers to move at their own pace, and it can be dropped in existing code with very little changes to the original source.

For example:

def my_gen():
  ...

m1 = Muxer(my_gen)
m2 = Muxer(my_gen)

consumer1(m1).start()
consumer2(m2).start()

As items are pulled from the primary generator they are inserted into queues for each listener. Listeners can subscribe any time by constructing a new Muxer():

import queue
from threading import Lock
from collections import namedtuple

class Muxer():
    Entry = namedtuple('Entry', 'genref listeners, lock')

    already = {}
    top_lock = Lock()

    def __init__(self, func, restart=False):
        self.restart = restart
        self.func = func
        self.queue = queue.Queue()

        with self.top_lock:
            if func not in self.already:
                self.already[func] = self.Entry([func()], [], Lock())
            ent = self.already[func]

        self.genref = ent.genref
        self.lock = ent.lock
        self.listeners = ent.listeners

        self.listeners.append(self)

    def __iter__(self):
        return self

    def __next__(self):
        try:
            e = self.queue.get_nowait()
        except queue.Empty:
            with self.lock:
                try:
                    e = self.queue.get_nowait()
                except queue.Empty:
                    try:
                        e = next(self.genref[0])
                        for other in self.listeners:
                            if not other is self:
                                other.queue.put(e)
                    except StopIteration:
                        if self.restart:
                            self.genref[0] = self.func()
                        raise
        return e

Original source code, including test suite:

https://gist.github.com/earonesty/cafa4626a2def6766acf5098331157b3

The unit tests run many threads concurrently processing the same generated events in sequence. The code is order preserving, with a lock acquired during the single generator's access.

Caveats: the version here uses a singleton to gate access, otherwise it would be possible to accidentally evade its control over the contained generators. It also allows the contained generators to be "restartable", which was a useful feature for me a the time. There is no "close()" feature, simply because I didn't need it. This is an appropriate use case for __del__ however, since the last reference to a listener is the right time to clean up.

Erik Aronesty
  • 11,620
  • 5
  • 64
  • 44