I have infinite stream of objects. And my requirement is that every item from the observable stream with the same key should be processed synchronously, and all the other items with different keys might/should process in parallel. The easiest way to do it (as mentioned in most places) is by using GroupByUntil
operator:
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
The code works well until I can guarantee that execution of ProcessItem(item)
takes less than 30 seconds. Otherwise group.Throttle(TimeSpan.FromSeconds(30), scheduler)
will close the group's stream and there's a very high probability that new item arrives and starts processing on a new thread.
So basically I need to somehow know that my thread has completed processing all the items with specific key and I need to inform within durationSelector
of GroupByUntil
operator parameter about it.
Any ideas on how to achieve this? Thanks in advance.