1

I have some observable, that produces items over time. Intervals between items is almost random. I subscribe to my observable and process item, each process can take a lot of time and when OnNext ends its work, many new items are produced. I need to take only last item and process it. So every time long OnNext operation completes I need do OnNext for latest item from items that was produced during previous OnNext run. Can I do it with Rx?

I tried Window(1).Switch() but seems like it executes immediately when item comes, not when OnNext completed.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
nuclear sweet
  • 1,079
  • 10
  • 27
  • 1
    *"I have some observer, that produces items over time."* <== The observers are not producers. The observables are the producers, and the observers are the consumers. Also if an observer blocks on the `OnNext` invocation by doing some long-running work, the observable that invoked this method will become blocked too, and it will not be able to issue any other `OnNext` during this time. It is mandatory, based on the [RX contract](http://reactivex.io/documentation/contract.html), that the `OnNext` notifications must be serialized. So I am not sure how your question could be answered. – Theodor Zoulias Oct 06 '21 at 16:03
  • Maybe what you are searching for, is the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator. – Theodor Zoulias Oct 06 '21 at 16:07
  • Yeah sorry, i meant observable, you right, fixed it. OnNext is blocked, and i know it, but observable still gets new items. And i dont need to process all of them, only last item on the moment prevoius OnNext is finished . – nuclear sweet Oct 06 '21 at 16:07
  • 1
    If i get it right ExhaustMap will ignore all items, but i need one item that came last. – nuclear sweet Oct 06 '21 at 16:13
  • Ah, OK, I got it now. Btw it's not a good idea to do lengthy work in the `OnNext` handler. The `OnNext`/`OnError`/`OnCompleted` handlers in Rx are synchronous, and block the caller while running. IMHO the correct place to run long-running code is as a side-effect of a derived observable, as shown [here](https://stackoverflow.com/questions/69070794/concurrent-subscriber-execution-in-system-reactive/69074298#69074298) for example. – Theodor Zoulias Oct 06 '21 at 16:20
  • Thanks, i will check it! – nuclear sweet Oct 06 '21 at 16:28
  • 1
    Check out this question: [How to merge a nested observable IObservable> with limited concurrency and limited buffer capacity?](https://stackoverflow.com/questions/65477010/how-to-merge-a-nested-observable-iobservableiobservablet-with-limited-concur) If you refactor your code to make use of the `Merge` operator, you could then replace the `Merge` with one of the `MergeBounded` implementations found in this question, with `maximumConcurrency: 1` and `boundedCapacity: 1`. – Theodor Zoulias Oct 06 '21 at 16:46
  • Seems like it is ok solution, but it a bit complex. I was though rx got something default for this. Probably its easier just store item in variable and overwrite it with new every time it comes. And make thread with infinity loop to process value if it exist. – nuclear sweet Oct 06 '21 at 18:01
  • AFAIK Rx has nothing available out of the box, offering this functionality. If you are not restricted at using Rx, you could take a look at different technologies that implement the producer-consumer pattern, but don't expect to find something built-in there either. You might get some ideas from [this](https://stackoverflow.com/questions/15928642/blockingcollection-max-size) question, or [this](https://stackoverflow.com/questions/44873603/tpl-dataflow-process-n-latest-messages). – Theodor Zoulias Oct 06 '21 at 19:06

1 Answers1

1

Here is relatively simple custom DroppingDo operator, that probably does what you want. It is somewhat similar with the built-in Do operator, with the difference that it invokes the action on the ThreadPool instead of the current thread, and that it ignores items that are received while a previous action is running. The latest item is preserved though.

/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
    this IObservable<TSource> source,
    Action<TSource> action,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        Tuple<TSource> latest = null;
        return source
            .Select(item =>
            {
                var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
                if (previous != null) return Observable.Empty<TSource>();
                return Observable.Defer(() =>
                {
                    var current = Interlocked.Exchange(ref latest, null);
                    Debug.Assert(current != null);
                    var unboxed = current.Item1;
                    return Observable.Start(
                        () => { action(unboxed); return unboxed; }, scheduler);
                });
            })
            .Concat();
    });
}

Usage example. Just replace your code that probably looks like this:

someObservable
    .Subscribe(x => Process(x), ex => HandleError(ex));

With this:

someObservable
    .DroppingDo(x => Process(x))
    .Subscribe(_ => { }, ex => HandleError(ex));
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Caution, the `Concat` operator [behaves wierdly](https://github.com/dotnet/reactive/issues/1634) in the current version of the Rx library (5.0.0). My advice is to use the equivalent `Merge(1)` operator instead, until the issue with the `Concat` is fixed. The `1` is the value of the `maxConcurrent` parameter. Setting this parameter to `1` means no concurrency. – Theodor Zoulias Nov 05 '21 at 19:44