TL;DR I'm looking for help to implement the marble diagram below. The intention is to sort the non-sorted values to the extent possible without waiting time between scan executions.
I'm not asking for a full implementation. Any guidance will be welcome.
I have an asynchronous slow (forced for testing purpose) scan of a infinite hot observable. This is the relevant code:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
This is the full version: https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
And this is the current output (values are random generated):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
I would like to sort the pending messages between scan executions. So the first emitted message will be always the first consumed one, but the next consumed message would be the min(value) of the emitted and non-consumed messages until that point (all of them in the current version because instant emitting). And so on an so forth... I think the marble diagram is better than my explanations.
Note that the scan is not waiting for the complete event, the only reason it doesn't start after the last message is emitted is because the sleep. Here you have another version in which the sleep has been removed from scan and put in ExternalDummyService. You can see that the values are consumed in the moment they are emitted. This is also shown in the marble diagram.
I tried with to_sorted_list, the only sorting method I found in RxPy, but I couldn't make it work.
What I'm looking for is something like this:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
Thanks