0

I’ve got new question about one (or two) Reactive methods. In my scenario I needed an observable sequence capable of suppressing other emitted Tasks while the first Task wasn’t completed, and ended up with something like this:

Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => Observable.FromAsync(async () =>
{
    await Task.Delay(1000);
    // Simulating long running task
    Console.WriteLine(x);
}))
.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())
.Subscribe();

I tried to Google but I really can’t explain few things:

  1. First of all, how that works ?
  2. What is exactly on that Reactive sequence that blocks the observable from reaching the subscription? What exactly Replay does in that? Isn’t Replay supposed to replay the Task in this case? Or I don’t know. Can anyone explain detailed every step in that Reactive query? What does Publish with that kind of selector. How Replay is playing in that query? And why do I need to call SelectMany on FirstAsync if only one element will be emitted anyway.
Vincent P
  • 33
  • 6
  • It would help if your code was valid. – NetMage Feb 24 '22 at 22:06
  • Regarding your initial quest of preventing new tasks from starting until a previously started task has completed, you might be looking for the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator. – Theodor Zoulias Feb 25 '22 at 01:06

1 Answers1

1
  1. The .SelectMany(c => c) is an idiomatic way to flatten/merge a nested sequence. You can replace it with .Merge(), and the behavior of the query will be identical.

  2. The Publish operator, when used with a Func<IObservable<TSource>, IObservable<TResult>> parameter, subscribes to the query on which it is chained, and then remains subscribed until the sequence produced by the lambda completes. So in your case, by wrapping the inner sequence x.FirstAsync().SelectMany(c => c).Replay() in a Publish, you delay the unsubscription from the chained sequence (the Interval+Select+FromAsync) until the inner sequence completes. The inner sequence never completes, so the chained sequence keeps forever producing one cold IObservable<Unit> subsequence every second. You can observe this happening, by intercepting a Do operator before the Publish:

.Do(x => Console.WriteLine($"New subsequence: {x.GetType().Name}"))
  1. The Replay operator is similar to the Publish, with the difference that the Replay has memory of past notifications, whilst the Publish has no memories whatsoever. I guess that your intention was to attach the Repeat instead of the Replay. The Replay without parameter produces a "connectable" observable, that doesn't subscribe automatically to the chained sequence. You have either to Connect it manually, or to attach the RefCount operator to it. In your case you are doing neither, so the resulting sequence never emits anything and never completes. It's a nasty dead-lock situation.
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • First of all thanks. That was really the answer I was looking for. But about that, there’s no deadlock in that sequence. You can try it yourself. It will produce what I needed. It will produce just one value every time the first observable is completed, or something like that. – Vincent P Feb 25 '22 at 05:47
  • What I needed is that: Imagine having something producing many values but I must handle only one value and discard others while analyzing that value. That analysis takes long, so that all produced values behind should not be emitted at all. Ideally I would need to disable the observable while the long task is running, and once the task is completed I can enable the observable again. – Vincent P Feb 25 '22 at 05:56
  • I still don’t understand what the Publish does. According to your answer seems some sort of TakeUntilComplete()? – Vincent P Feb 25 '22 at 06:05
  • 1
    Update. My mistake. I corrected the question, since you had right about Repeat in stead of Replay in fact in my code I had Repeat() but couldn't check when added this question. – Vincent P Feb 25 '22 at 07:55
  • @VincentP disabling the observable while the long task is running, is not something generally doable. Observables are supposed to be passively observed, not actively controlled by the observer! Two possibilities come to my mind: (1) unsubscribing from the observable every time it produces a value and resubscribing after the completion of the long task and (2) blocking the thread that emits the notification about the new value. None of them guarantees that the observable will be stopped, but you might give it a try if you are desperate and you lack other options. – Theodor Zoulias Feb 25 '22 at 12:20
  • @VincentP regarding the [`Publish`](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229288(v=vs.103)) operator, its main purpose is to allow subscribing multiple times to the same sequence, while the underlying sequence is subscribed only once. In your case you need this operator because otherwise the `Repeat` operator would cause repeated subscriptions to the source sequence, which is something that we generally want to avoid (because it can have undesirable side-effects). – Theodor Zoulias Feb 25 '22 at 12:34
  • Thanks but I'm still very confused about `.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())` and how it's able to Process the Task only once disabling next ones. – Vincent P Feb 25 '22 at 13:27
  • @VincentP are you the author of this code, or you found it from someone else? If you are the author, you shouldn't really write code that you don't understand how it works! If you are not the author, my suggestion is to try to understand each individual operator first, by experimenting with simple queries containing a single unknown operator each time. Your current query is too complex to be digested all at once! – Theodor Zoulias Feb 25 '22 at 13:36
  • @VincentP be aware that the `.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())` has the potential of creating a tight loop in case the source sequence completes. In your example the source sequence is an `Observable.Interval` that never completes (it is projected, but not filtered), so this is not an issue. But I guess that your real code might be different. I've posted a recent question about those unusual situations [here](https://stackoverflow.com/questions/71283104/how-to-repeat-an-observable-sequence-until-its-empty). – Theodor Zoulias Feb 27 '22 at 09:37