7

TL;DR I'm looking for help to implement the marble diagram below. The intention is to sort the non-sorted values to the extent possible without waiting time between scan executions.

I'm not asking for a full implementation. Any guidance will be welcome. not consumed min marble diagram I have an asynchronous slow (forced for testing purpose) scan of a infinite hot observable. This is the relevant code:

thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
    .scan(seed=State(0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

external_obs.connect()
thread.start()

def slow_scan_msg(state, msg):
    sleep(0.4)
    return state \
        ._replace(count = state.count + 1) \
        ._replace(last_msg = msg)

This is the full version: https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd

And this is the current output (values are random generated):

emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED

I would like to sort the pending messages between scan executions. So the first emitted message will be always the first consumed one, but the next consumed message would be the min(value) of the emitted and non-consumed messages until that point (all of them in the current version because instant emitting). And so on an so forth... I think the marble diagram is better than my explanations.

Note that the scan is not waiting for the complete event, the only reason it doesn't start after the last message is emitted is because the sleep. Here you have another version in which the sleep has been removed from scan and put in ExternalDummyService. You can see that the values are consumed in the moment they are emitted. This is also shown in the marble diagram.

I tried with to_sorted_list, the only sorting method I found in RxPy, but I couldn't make it work.

What I'm looking for is something like this:

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
    .buffered_sort(lambda msg: msg.timestamp) \
############
    .scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
    .subscribe(log, print, lambda: print("SLOW FINISHED"))

Thanks

raul.vila
  • 1,984
  • 1
  • 11
  • 24
  • By the way shouldn't the sequence on the blue line of the diagram be [2 1 4 5 3] ? – Jacques Gaudin Apr 18 '18 at 14:34
  • @JacquesGaudin Good question, when the 3rd item is consumed, the non-consumed items are `[3, 5, 4]` and we're taking the `min` of that list. So the 3rd item should be `3`. Is clear that way? – raul.vila Apr 18 '18 at 14:36
  • Ha I got you you are taking `min(counter)` not `min(time difference)` – Jacques Gaudin Apr 18 '18 at 14:39
  • Sorry for the confusion; the marble diagram is intented to be agnostic of the data sctructure. Just think of the yellow values as the `message.timestamp`. – raul.vila Apr 18 '18 at 14:40
  • But the timestamps are not ordered in time ?! – Jacques Gaudin Apr 18 '18 at 14:41
  • @JacquesGaudin No because the `fake_timestamp` includes a `random()` value. I think I should not have included the timestamp property. It would be clearer if I have used only integers :( – raul.vila Apr 18 '18 at 14:44
  • The intention is to order the unordered values as much as possible without waiting for any time between the scan executions. – raul.vila Apr 18 '18 at 14:51

1 Answers1

5

If you want to use to_sorted_list you need to remap the list you obtain in a single element. Changing your main function to:

def main():
    thread_1_scheduler = ThreadPoolScheduler(1)

    thread = ExternalDummyService()
    external_obs = thread.subject.publish()

    external_obs \
        .flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
        .to_sorted_list(key_selector=lambda msg: msg.timestamp) \
        .flat_map(lambda msglist: Observable.from_iterable(msglist)) \
        .scan(seed=State(0, None), accumulator=slow_scan_msg) \
        .subscribe(log, print, lambda: print("SLOW FINISHED"))

    external_obs.connect()

    thread.start()

gives:

>emitting Msg(count=0, timestamp=18.924474716186523)
>emitting Msg(count=1, timestamp=4.669189453125)
>emitting Msg(count=2, timestamp=18.633127212524414)
>emitting Msg(count=3, timestamp=15.151262283325195)
>emitting Msg(count=4, timestamp=14.705896377563477)
>SLOW st.count=0 last_msg.counter=1 ts=4.67
>SLOW st.count=1 last_msg.counter=4 ts=14.71
>SLOW st.count=2 last_msg.counter=3 ts=15.15
>SLOW st.count=3 last_msg.counter=2 ts=18.63
>SLOW st.count=4 last_msg.counter=0 ts=18.92
>SLOW FINISHED

Note that the to_sorted_list method will wait for the end of the subject stream to start scanning, so you cannot use it to implement your marble diagram as it shown in the question.

To properly implement it, I think you would need something like onBackpressureBuffer which is implemented in RxJava but not in RxPy.

This wouldn't solve entirely the issue as the buffer is FIFO (first in first out) and you want a custom way of chosing which message goes out first. This may require a tweak on how the request to the buffer are handled.

You may find a better way to progress towards a solution with a extension of RxPy called rxbackpressure, particularly with its class dequeuablebuffer.py that you may be able to adapt to suit your needs.

Jacques Gaudin
  • 15,779
  • 10
  • 54
  • 75
  • Thanks a lot for the info, it looks promising (I'll take a deeper look and update with whatever I get). Just one nuance: I don't wante neither FIFO or LIFO... I want a `min()` of some value. – raul.vila Apr 18 '18 at 13:17
  • 1
    Ok I probably misunderstood what you want to achieve. I hope the above will help. It may be worth editing the question to clarify. Good luck :-) – Jacques Gaudin Apr 18 '18 at 13:22
  • I looked again at the marble diagram and I see it can be undestood as a LIFO. I'll change it to avoid confusion. Thanks for pointing it. – raul.vila Apr 18 '18 at 13:29
  • I'm testing your code and it seems that the scan doesn't start processing until the source is completed. That's a main issue, scan should start in the moment the first value is emmited. I think that requirement is well defined in the marble diagram (let me know if it's confusing, thanks). – raul.vila Apr 18 '18 at 13:42
  • I've updated the marble diagram to clarify, also I specified it's a *infinite* hot observable, so I can't wait for the complete event. Thanks again, I think the most promising clue is the onBackpressureBuffer. I'll look at other languages implementations and see if I can migrate them to python. I'll wait a bit until mark your answer as solution (if there is no better one). – raul.vila Apr 18 '18 at 13:47
  • 1
    Yes the scan doesn't start until the source is complete like in your original code. That's the reason why you need a buffer. Backpressure is a tool to handle this kind of situation where you have a fast source and a slow observer. But you have a risk of overflowing the memory if messages keep stacking up. Adapting to python seem to be a lot of work. You may want to check out https://github.com/MichaelSchneeberger/rxbackpressure – Jacques Gaudin Apr 18 '18 at 14:08
  • That library looks AWESOME! Thanks a lot again. Just another clarification: <... like in your original code> This is not true, the only reason the scan doesn't start after the last message is emitted is because the `sleep`. [Here you have another version](https://pyfiddle.io/fiddle/3425dde3-7702-4571-b678-18ce93af9242/) in which the `sleep` has been removed from `scan` and put in `ExternalDummyService`. You can see that the values are consumed in the moment they are emitted. – raul.vila Apr 18 '18 at 14:13
  • Also, in [rxbackpreassure](https://github.com/MichaelSchneeberger/rxbackpressure) I saw a reference to [ControlledSubject](https://github.com/ReactiveX/RxPY/blob/6beb2e2cbb108fdf924561f9431eb4aa610c6ad9/rx/backpressure/controlledsubject.py) which also looks very interesting. – raul.vila Apr 18 '18 at 14:19
  • Point taken. I had another version here: https://pyfiddle.io/fiddle/c830f5a3-40ca-4189-979e-cfbdfc75a61f/?i=true. If you have control of the source then `ControlledSubject` could be helpful. – Jacques Gaudin Apr 18 '18 at 14:25
  • After changing python version and adding the rx pacakge in your Pyfiddle... I see no difference, just a little refactor on how `dummy_msg` is generated, but the main pipeline is the same O_o, sorry if I'm missing something. – raul.vila Apr 18 '18 at 14:33
  • I've just saw your link to [dequeuablebuffer](https://github.com/MichaelSchneeberger/rxbackpressure/blob/master/rxbackpressure/buffers/dequeuablebuffer.py) and it looks easy to refactor to what I want... thanks! – raul.vila Apr 18 '18 at 14:49