0

I'm trying to notify listeners who subscribed to Subject _sub from another observable and after that log some message in Do handler. I'm calling OnNext and everything would work fine if _sub wasn't asynchronous. The problem here is that there is no OnNextAsync function which I would await in the first observable. What is the best way to do this?

 class Program
        {
            private static Subject<int> _sub = new Subject<int>();

            static void Main(string[] args)
            {
                _sub.SelectMany(async _ =>
                {
                    Console.WriteLine("SUB START: " + _);
                    await Task.Delay(3000);
                    Console.WriteLine("SUB END: " + _);
                    return 1;
                }).Subscribe();


                Start();
            }

            public static void Start()
            {
                int count = 0;
                Observable.Interval(TimeSpan.FromSeconds(5)).Select(x =>
                {
                    Console.WriteLine("START INTERVAL");
                    _sub.OnNext(count++); //onNext is not awaitable
                    Console.WriteLine("END INTERVAL");
                    return 1;
                })
                .Do(_ => Console.WriteLine("ALL FINISHED"))
                .Subscribe();

                Console.WriteLine("READLINE");
                Console.ReadLine();
            }

        }

Result:

READLINE
START INTERVAL
SUB START: 0
END INTERVAL
ALL FINISHED
SUB END: 0

Expected result:

READLINE
START INTERVAL
SUB START: 0
SUB END: 0
END INTERVAL
ALL FINISHED
MistyK
  • 6,055
  • 2
  • 42
  • 76
  • An observable should not rely on the behavior of its observers. – Paulo Morgado Oct 09 '18 at 22:33
  • @PauloMorgado what do you suggest? – MistyK Oct 10 '18 at 06:13
  • You may be interested to a `ForEachAsync` method for observables, that executes an asynchronous action for each value of the sequence, and can be awaited for the completion of the sequence itself and all the asynchronous actions. You can find an implementation [here](https://stackoverflow.com/questions/45382799/how-to-use-rx-nex-extension-foreachasync-with-async-action/64936628#64936628). – Theodor Zoulias Nov 29 '20 at 13:34

2 Answers2

0

An observable should not rely on the behavior of its observers.

I suggest you rethink the whole thing. What you're doing looks more interactive than reactive.

Paulo Morgado
  • 14,111
  • 3
  • 31
  • 59
  • I don't agree. It's just a simple scenario where I want to log message before and after I notified my observers. The only difference is that the observer may be asynchronous. Anyway it should be a comment not an answer. – MistyK Oct 10 '18 at 12:59
  • (This was meant to be a comment but I still think it's the answer) – Paulo Morgado Oct 10 '18 at 18:42
  • But you're logging before and after you notified your observers. It's just that the observer started doing something after being done being notified. – Paulo Morgado Oct 10 '18 at 18:45
0

I'm going to echo Paulo for clarity:

  • Observables shouldn't care about Observers. While an Observable will wait on its synchronous Observers, it's best to think of this as an accident of implementation. An observable doesn't wait at all on an asynchronous Observer. Either way, the waiting shouldn't be relied upon.

  • You should really re-think how you're doing this. You're using a reactive library to write interactive code. Probably either the wrong choice of tool or a misuse of the tool.

  • Your code is littered with Rx code-smells. Think of reactive code as a flowchart. A flowchart of your code would look like spaghetti. It should look more like a binary tree.

This sounds like an XY problem: I would suggest re-phrasing your question with what you're trying to accomplish.

Rajan Mishra
  • 1,178
  • 2
  • 14
  • 30
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • It's not a XY Problem. I have a real scenario, the above version is just simplified. Why do you say it's interactive code? The whole idea is that I have multiple observables (e.g data has changed) that can notify the second observable (e.g. for each data change check if process can start) – MistyK Oct 11 '18 at 06:44