3

I have infinite stream of objects. And my requirement is that every item from the observable stream with the same key should be processed synchronously, and all the other items with different keys might/should process in parallel. The easiest way to do it (as mentioned in most places) is by using GroupByUntil operator:

var results = observableStream
    .GroupByUntil(item => item.Id, group =>
        group.Throttle(TimeSpan.FromSeconds(30), scheduler))
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Select(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

The code works well until I can guarantee that execution of ProcessItem(item) takes less than 30 seconds. Otherwise group.Throttle(TimeSpan.FromSeconds(30), scheduler) will close the group's stream and there's a very high probability that new item arrives and starts processing on a new thread.

So basically I need to somehow know that my thread has completed processing all the items with specific key and I need to inform within durationSelector of GroupByUntil operator parameter about it.

Any ideas on how to achieve this? Thanks in advance.

Azat
  • 2,275
  • 1
  • 28
  • 32
  • 2
    How do you know you have received the last one of a particular key? – NetMage Aug 03 '17 at 20:30
  • @NetMage Actually I won't know. What I am trying to achieve is I should start throttling (debouncing) only if the thread that processes specific group has done it's job and there's nothing in the queue anymore. – Azat Aug 03 '17 at 20:43
  • Is `ProcessItem` synchronous? Is it `async`? Does it return `IObservable`? – Shlomo Aug 04 '17 at 13:10
  • @Shlomo It's not async, but it will return observable. – Azat Aug 04 '17 at 13:52

2 Answers2

4

This is very similar to this question: A way to push buffered events in even intervals.

Form the answer to that question, there's an operator Drain:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Given that operator, your problem becomes very simple:

var results = observableStream
    .GroupBy(item => item.Id)
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Drain(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

Given a stream that looks like A1, A2, B1, A3, B2, C1, B3, C2, GroupBy separates the streams by IDs:

A: A1, A2, A3
B: B1, B2, B3
C: C1, C2

...and Drain makes sure that for the items in a given sub-stream, they run serially, not in parallel.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Nice solution, but using only `GroupBy` the groups won't be destroyed, and I might run out of memory if there are huge number of unique keys. – Azat Aug 04 '17 at 17:12
0

It seems that you need a variant of the RxJS exhaustMap operator:

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

An Rx implementation of this operator (ExhaustMap) can be found here. In your case you just need to apply the same logic for each grouped subsequence of the observable sequence:

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence that has the same key has completed.</summary>
public static IObservable<TResult> ExhaustMapPerKey<TSource, TKey, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<TSource, TKey, IObservable<TResult>> function,
    IEqualityComparer<TKey> keyComparer = default)
{
    // Arguments validation omitted
    keyComparer ??= EqualityComparer<TKey>.Default;
    return source
        .GroupBy(keySelector, keyComparer)
        .SelectMany(group =>
        {
            int localMutex = 0; // 0: not acquired, 1: acquired
            return group.SelectMany(item =>
            {
                if (Interlocked.CompareExchange(ref localMutex, 1, 0) == 0)
                    return function(item, group.Key)
                        .Finally(() => Volatile.Write(ref localMutex, 0));
                return Observable.Empty<TResult>();
            });
        });
}

Usage example:

var results = observableStream
    .ExhaustMapPerKey(item => item.Id, (item, key) =>
        Observable.Start(() => ProcessItem(item), scheduler));
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104