Here is relatively simple custom DroppingDo
operator, that probably does what you want. It is somewhat similar with the built-in Do
operator, with the difference that it invokes the action on the ThreadPool
instead of the current thread, and that it ignores items that are received while a previous action is running. The latest item is preserved though.
/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
this IObservable<TSource> source,
Action<TSource> action,
IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
Tuple<TSource> latest = null;
return source
.Select(item =>
{
var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
if (previous != null) return Observable.Empty<TSource>();
return Observable.Defer(() =>
{
var current = Interlocked.Exchange(ref latest, null);
Debug.Assert(current != null);
var unboxed = current.Item1;
return Observable.Start(
() => { action(unboxed); return unboxed; }, scheduler);
});
})
.Concat();
});
}
Usage example. Just replace your code that probably looks like this:
someObservable
.Subscribe(x => Process(x), ex => HandleError(ex));
With this:
someObservable
.DroppingDo(x => Process(x))
.Subscribe(_ => { }, ex => HandleError(ex));