I am new to all of the Reactive programming concepts and I need to deal with the following situation -- I have a consumer logic that is relatively long-running, in a way that a couple of messages may be produced while a preceding message is still being processed.
I need to skip those messages and prevent calling the relevant consume logic for those. Here is an example (in C#)
IObservable<int> stream = ...
stream.SubscribeOn(TaskPoolScheduler.Default).Subsribe(ProcessMessage, cancellationToken);
Imagine that a message is pushed to the stream every 5 milliseconds: (1, 2, 3, 4, 5)
The ProcessMessage
method may take about 10-15ms to complete, meaning when it received message 1
, it will still be working when 2
and 3
are being produced.
I need to skip calling ProcessMessage
for 2
and 3
and catch up with 4
directly, or whatever the next unprocessed message will be.
Is there any built-in construct in Reactive extensions that allows me to handle this particular case?