11

I need to implement the following algorithm in Rx.NET:

  1. Take latest item from stream, or wait for a new item without blocking, if there are no new items. Only the latest item matters, others can be dropped.
  2. Input the item to SlowFunction and print the output.
  3. Repeat from step 1.

The naive solution is:

let PrintLatestData (stream: IObservable<_>) =
    stream.Select(SlowFunction).Subscribe(printfn "%A")

However, this solution does not work because on average stream emits items faster than SlowFunction can consume them. Since Select does not drop items but instead attempts to process every item in order from oldest to newest, the delay between an item being emitted and printed will grow towards infinity as the program run. Only the latest recent item should be taken from the stream to avoid this infinitely growing backpressure.

I searched the documentation and found a method called onBackpressureLatest in RxJava, which to my understanding would do what I described above. However, the method does not exist in Rx.NET. How to implement this in Rx.NET?

Steve
  • 374
  • 1
  • 4
  • 13
  • 1
    What's the problem with `SlowFunction` being slower than the stream? – Fyodor Soikin Feb 14 '17 at 22:18
  • @FyodorSoikin If `SlowFunction` is slower than the stream, new items are being emitted faster than they can be processed and printed. So as the program runs, the delay/lag between a new item being emitted and `SlowFunction` output for said item being printed grows towards infinity. This is unacceptable since I need to monitor data in real time. I only care about the latest item. – Steve Feb 14 '17 at 22:59
  • 1
    Is `SlowFunction` synchronous? Or Observable/Async? – Shlomo Feb 14 '17 at 23:03
  • @Shlomo SlowFunction is synchronous. – Steve Feb 14 '17 at 23:03
  • You could try the TPL Dataflow: https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx – N_A Feb 15 '17 at 04:20
  • 1
    Also check out this thread for ideas http://stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is – marklam Feb 15 '17 at 09:43

4 Answers4

11

I think you want to use something like ObserveLatestOn. It effectively replaces the queue of incoming events with a single value and a flag.

James World has blogged about it here http://www.zerobugbuild.com/?p=192

The concept is used heavily in GUI applications that cant trust how fast the server may push data at it.

You can also see an implementation in Reactive Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 and the supporting presentations explaining ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014

To be clear this is a load-shedding algorithm, not a backpressure algorithm.

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • `ObserveLatestOn` was just what I needed, thank you! – Steve Feb 15 '17 at 18:03
  • 1
    Whilst this may theoretically answer the question, [it would be preferable](//meta.stackoverflow.com/q/8259) to include the essential parts of the answer here, and provide the link for reference. – Draken Feb 17 '17 at 12:38
  • Or point out that it could be a duplicate of http://stackoverflow.com/questions/6384312/how-can-i-observe-values-in-a-non-blocking-way-using-rx and http://stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is/. The answer I suppose was probing, I wasn't sure if he OP wanted backpressure (as per the title), or load shedding (as per the description) – Lee Campbell Feb 17 '17 at 23:46
1

The sync/async suggestion may help slightly, but, given slow function is always slower than the stream of events, making it async might allow you to parallelise the handling (with observe on a thread pool) at the cost of (eventually) just running out of threads or adding more latency with context switching. It doesn't sound like a solution to me.

I suggest you look at the open source Rxx 'Introspective' operators written by Dave Sexton. These can vary the buffer/throttle period which you get latest from, as the queue backs up due to a slow consumer. If slow function suddenly gets faster, it won't buffer things at all. If it gets slower, it will buffer it more. You'll have to check if there is a 'latest from' type, or just modify existing to suit your needs. E.g. Use buffer and just take last item in buffer, or enhance further to internally store only the latest. Google 'Rxx', and you'll find it on Github somewhere.

A simpler approach, if the time of 'slow function' is fairly predictable, is to simply throttle your stream by an amount which exceeds this time. Obviously I don't mean the standard rx 'throttle', but one which lets a more recent update through instead of an old one. There are plenty of solutions to this sort of problem readily available on here.

H_Andr
  • 179
  • 3
1

The same question occurred to me as well some time ago, and I didn't find a built-in operator that does exactly this. So I wrote my own, which I called Latest. Not trivial to implement, but found it to be very useful in my current project.

It works like this: while the observer is busy processing a previous notification (on it's own thread, of course), it queues the last up to n notifications (n >= 0) and OnNexts the observer as soon as it becomes idle. So:

  • Latest(0): only observe items arriving while observer is idle
  • Latest(1): always observe the latest
  • Latest(1000) (e.g.): Usually process all items, but if something gets stuck down the line, rather miss some than get an OutOfMemoryException
  • Latest(int.MaxValue): Never miss an item, but load balance between the producer and consumer.

Your code would thus be: stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

The signature looks like this:

/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)

Implementation is too big to post here, but if someone is interested, I'd happily share it. Let me know.

tinudu
  • 1,139
  • 1
  • 10
  • 20
1

You could sample the stream at an interval you know SlowFunction can handle. Here's an example in java:

TestScheduler ts = new TestScheduler();

Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500);
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println);

ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
98
198
298
398
498
499

The sample doesn't cause backpressure and always grabs the latest value in the stream, so it meets your requirement. Additionally sample will not send through the same value twice (which can be seen from above as 499 is only printed once).

I think this would be a valid C#/F# solution:

static IDisposable PrintLatestData<T>(IObservable<T> stream) {
    return stream.Sample(TimeSpan.FromMilliseconds(100))
        .Select(SlowFunction)
        .Subscribe(Console.WriteLine);
}
let PrintLatestData (stream: IObservable<_>) =
    stream.Sample(TimeSpan.FromMilliseconds(100))
        .Select(SlowFunction)
        .Subscribe(printfn "%A")
flakes
  • 21,558
  • 8
  • 41
  • 88
  • 1
    This is a very simple solution to the problem, but in my particular use case the run time of SlowFunction varies too much for this approach to work. Thanks for sharing. – Steve Feb 15 '17 at 18:10