1

I am new to all of the Reactive programming concepts and I need to deal with the following situation -- I have a consumer logic that is relatively long-running, in a way that a couple of messages may be produced while a preceding message is still being processed.

I need to skip those messages and prevent calling the relevant consume logic for those. Here is an example (in C#)

IObservable<int> stream  = ...
stream.SubscribeOn(TaskPoolScheduler.Default).Subsribe(ProcessMessage, cancellationToken);

Imagine that a message is pushed to the stream every 5 milliseconds: (1, 2, 3, 4, 5)

The ProcessMessage method may take about 10-15ms to complete, meaning when it received message 1, it will still be working when 2 and 3 are being produced.

I need to skip calling ProcessMessage for 2 and 3 and catch up with 4 directly, or whatever the next unprocessed message will be.

Is there any built-in construct in Reactive extensions that allows me to handle this particular case?

Ivaylo Slavov
  • 8,839
  • 12
  • 65
  • 108
  • 1
    Check out the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator. It may be exactly what you need. – Theodor Zoulias Nov 19 '20 at 19:20
  • 1
    @TheodorZoulias, indeed this looks like what I am after. Thank you! – Ivaylo Slavov Nov 19 '20 at 22:03
  • I will try to close this questions as a duplicate to https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net – Ivaylo Slavov Nov 19 '20 at 22:04
  • yep, that's a very elegant solution – Ivaylo Slavov Nov 19 '20 at 22:09
  • 1
    AFAIK it should be possible to close your own question as duplicate single-handedly. I have closed a couple of my own questions this way. – Theodor Zoulias Nov 19 '20 at 22:09
  • I thought so as well but I guess it is no longer the case. The only way I could close it is by pressing the `close question` link below, which just presented the close voting dialog :|. Maybe I need a few more rep to be able to directly close a question of mine, or maybe moderation flow changed. It was a long time ago when I closed my own question and I have almost no memory of the experience then – Ivaylo Slavov Nov 19 '20 at 22:15
  • Yeah, maybe it's a rep thing. It may be an intermediate privilege between being able to vote open/close for any question, and being able to open/close any question single-handedly (a privilege that I see it abused quite often). – Theodor Zoulias Nov 19 '20 at 22:26

0 Answers0