35

Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.

However, when my Subscribe method completes, if any messages did come through I want to process the last one. So I always process the most recent message.

So, if I have some code which does:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.

Here's an example of the result I want using a background task and Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).

I can also get the result I want by using a BroadcastBlock from TPL Dataflow, like this:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

but this feels like it should be possible directly in Rx. What's the best way to go about doing it?

Wilka
  • 28,701
  • 14
  • 75
  • 97

9 Answers9

9

Here is a method that is similar to Dave's but uses Sample instead (which is more appropriate than buffer). I've included a similar extension method to the one I added to Dave's answer.

The extension:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

Note that it's simpler, and there is no 'empty' buffer that's fired. The first element that is sent to the action actually comes from the stream itself.

Usage is straightforward:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

And results:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
Martin Liversage
  • 104,481
  • 22
  • 209
  • 256
yamen
  • 15,390
  • 3
  • 42
  • 52
  • 4
    This has a problem in that if the source has not put anything in the sample buffer at the point that sampler.OnNext is called then the system goes into a state where it will not generate any more values. I did a variation on this one using Switch instead of sample http://stackoverflow.com/a/15876519/158285 – bradgonesurfing Apr 08 '13 at 10:26
  • shouldn't the returned *IDisposable* also take care of disposing the inner *Subject* ? – superjos Aug 22 '15 at 14:03
4

Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}
Wilka
  • 28,701
  • 14
  • 75
  • 97
  • What is the purpose of `Materializing` and using the `Notifications` versus simply storing the value itself? From my testing it seems that it works as expected to keep track of the value alone - but I am perhaps missing some fundamentals. – Andrew Hanlon Sep 06 '15 at 13:50
  • 2
    @AndrewHanlon using the Notification instead of just the value is for dealing with exceptions, otherwise they won't be passed down the OnError channel properly. – Wilka Sep 06 '15 at 14:11
  • Ah, that makes sense! Thank you. – Andrew Hanlon Sep 07 '15 at 04:00
3

Here is an attempt using "just" Rx. The timer and the subscriber are kept independent by observing on the threadpool and I've used a subject to provide feedback on completing the task.

I don't think this is a simple solution, but I hope it might give you ideas for improvement.

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });

feedback.OnNext(Unit.Default);

There is one slight problem -- the buffer is first closed when it's empty so it generates the default value. You could probably solve it by doing the feedback after the first message.


Here it is as an extension function:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();

    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });

    feedback.OnNext(Unit.Default);

    return sub;
}

And usage:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });
Dave Hillier
  • 18,105
  • 9
  • 43
  • 87
3

An example using Observable.Switch. It also handles the case when you complete the task but there is nothing in the queue.

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();

            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });

            sampler.OnNext(Unit.Default);

            return new CompositeDisposable(connection, subscription);
        }
    }
}
bradgonesurfing
  • 30,949
  • 17
  • 114
  • 217
  • I've just noticed this has can miss values. I.e. it doesn't always process the most recent value lands in the queue when it's already doing something. e.g. https://gist.github.com/WilkaH/5403360 only prints "Done 100", not "Done 2" afterwards (the 1 should be dropped because it's superseded) – Wilka Apr 17 '13 at 10:50
  • It should ignore items that land in the queue when it is currently processing. I'm not sure what you mean. – bradgonesurfing Apr 17 '13 at 11:32
  • In which case I didn't make it clear in my original question. I always want the newest item to be processed, so if it comes in while something else is processing, then that item should be processed when the current one completes (instead of being missed). – Wilka Apr 17 '13 at 12:08
  • Should only require a small change to the above code to apply a buffer of one element. Exercise for the reader perhapps? :) – bradgonesurfing Apr 17 '13 at 13:26
  • I'm clearly being a numpty today, I tried buffer (and a few other things) but I couldn't figure out where I wanted to be buffering to get it to work. – Wilka Apr 17 '13 at 13:42
3

I've written a blog post about this with a solution that uses CAS instead of locks and avoids recursion. The code is below, but you can find a complete explanation here: http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pendingNotification = null;
        var cancelable = new MultipleAssignmentDisposable();

        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);

                if (previousNotification != null) return;

                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}
James World
  • 29,019
  • 9
  • 86
  • 120
1

Here's a Task based implementation, with cancellation semantics, which doesn't use a subject. Calling dispose allows the subscribed action to cancel processing, if so desired.

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;

        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

Here's a simple test:

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);

                          Console.WriteLine(v);
                      });

The output:

0
7
14
21
28
35
Asti
  • 12,447
  • 29
  • 38
1

With Rx 2.0 RC you can use Chunkify to get an IEnumerable of lists, each containing what was observed since the last MoveNext.

You can then use ToObservable to convert that back to an IObservable and only pay attention to the last entry in each non-empty list.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });
marklam
  • 5,346
  • 1
  • 24
  • 26
  • 2
    This does work, but it leaves a thread spinning to pull stuff from the observable (so one of my CPUs gets maxed out) – Wilka Apr 17 '13 at 12:18
  • And it's building up a List full of values you're potentially going to ignore. The ObserveLatestOn extension avoids this - no list, no allocation from growing the list, no references keeping the old notifications alive. – Niall Connaughton May 24 '13 at 05:23
1

Just finished (and already completely revised) my own solution to the problem, which I plan to use in production.

Unless the scheduler uses the current thread, calls to OnNext, OnCompleted, OnError from the source should return immediately; if the observer is busy with previous notifications, they go into a queue with a specifiable maximum size, from where they'll be notified whenever the previous notification has been processed. If the queue fills up, least recent items are discarded. So, a maximum queue size of 0 ignores all items coming in while the observer is busy; a size of 1 will always let observe the latest item; a size up to int.MaxValue keeps the consumer busy until it catches up with the producer.

If the scheduler supports long running (ie gives you a thread of your own), I schedule a loop to notify the observer; otherwise I use recursive scheduling.

Here's the code. Any comments are appreciated.

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;

        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }

    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));

            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);

            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }

        #region Subscriptions

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }

            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null

                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);

                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }

                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }

            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }

                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }

            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }

        #endregion

        #region IQueue

        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }

        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }

            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }

                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }

            private sealed class One : IQueue
            {
                private TSource _item;

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }

            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;

                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));

                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }

                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }

                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }

        #endregion
    }
}
tinudu
  • 1,139
  • 1
  • 10
  • 20
  • 1
    Wow, that's a lot of well-documented stuff. Although I'm just reviewing and not interested in the topic, I think I have to thank you for bringing a nice piece of work here. – YakovL Oct 15 '16 at 12:30
  • You're welcome. Though I didn't do it out of pure altruism, I hope someone will find it useful as well (and helps me get the 50 reputation so I'll at least could comment on posts) – tinudu Oct 16 '16 at 19:18
0

Yet another solution.

This is not pretty, because it mixes Task and Observable, so it's not really testable using ReactiveTest (though to be honest, I'm not sure how I'd implement a 'slow' subscriber with ReactiveTest either).

public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        Task task = Task.FromResult(0);
        return source.Subscribe(t =>
        {
            if(task.IsCompleted)
                task = Task.Run(() => observer.OnNext(t));
            else
                Debug.WriteLine("Skip, task not finished");
        }, observer.OnError, observer.OnCompleted);
    });
}

I'm guessing there might be a race condition in there, but to my mind, if we're at the stage where we're ditching stuff because it's going too fast, I don't mind ditching one too many or too few. Oh, and each OnNext is called (potentially) on a different thread (I guess I could put a Synchronize on the back of the Create).

I admit I couldn't get the Materialize extension to work properly (I hooked it up to a FromEventPattern(MouseMove) and then subscribed with a deliberately slow Subscribe, and weirdly it would let bursts of events through, rather than one at at time)

Benjol
  • 63,995
  • 54
  • 186
  • 268