5

In a C# console application, using System.Reactive.Linq, I'm trying to make an observable, where each item is the string result of some processing by another observable. I've created a simple repro using strings and characters. Warning, this example is completely CONTRIVED, and the point is that the nested .Wait() hangs.

class Program
{
    static void Main(string[] args)
    {
        string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
        IObservable<string> files = fileNames.ToObservable();
        string[] extensions = files.Select(fn =>
        {
            var extension = fn.ToObservable()
            .TakeLast(4)
            .ToArray()
            .Wait(); // <<<<<<<<<<<<< HANG HERE
            return new string(extension);
        })
        .ToArray()
        .Wait();
    }
}

Again, this is not how I would find the suffix of many filenames. The question is how I can produce an Observable of strings, where the strings are computed from a completed observable.

If I pull out this code and run it alone, it completes fine.

     var extension = fn.ToObservable()
        .TakeLast(4)
        .ToArray()
        .Wait();

There is something about the nested Wait() on the async methods, which I don't understand.

How can I code the nested async observables, so I can produce a simple array of string?

Thanks

-John

JohnKoz
  • 908
  • 11
  • 21
  • 2
    Your methods are **not** asynchronuous: Wait() is a blocking operation... See https://stackoverflow.com/questions/37801699/what-does-the-wait-operator-in-rx-net-do – Johan Donne Sep 02 '17 at 17:48
  • Can you explain why are you using `Wait()` in the first place? – Federico Dipuma Sep 02 '17 at 17:57
  • Somewhat related: [Why does repeated Enumerable to Observable conversion block](https://stackoverflow.com/questions/61041484/why-does-repeated-enumerable-to-observable-conversion-block) – Theodor Zoulias Dec 21 '20 at 00:18

3 Answers3

6

The reason why your code is blocking is because you are using ToObservable() without specifying a scheduler. In this case it will use the CurrentThreadScheduler.

So the files observable issues it's first OnNext() [A] (sending "file1.doxc") using the current thread. It can't continue iterating until that OnNext() returns. However, the inner fn observable also uses ToObservable() and the Wait() blocks until fn completes - it will queue the first OnNext() (sending "f") to the current thread scheduler but it will never be able to send it because now the first OnNext() [A] will never return.

Two simple fixes:

Either change the files observable like this:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);

Or, avoid the use of the inner Wait() with a SelectMany (which is definitely more idiomatic Rx):

string[] extensions = files.SelectMany(fn =>
{
    return fn.ToObservable()
             .TakeLast(4)
             .ToArray()
             .Select(x => new string(x));
})
.ToArray()
.Wait();

// display results etc.

Each approach will have quite different execution semantics - the first will run much like a nested loop, with each inner observable completing before the next outer iteration. The second will be much more interleaved since the blocking behaviour of the Wait() is removed. If you use the Spy method I wrote and attach it after both ToObservable() calls, you'll see this behaviour quite clearly.

James World
  • 29,019
  • 9
  • 86
  • 120
  • 2
    Thank you James, great answer, to the point and informative. I didn't have a good reason for not using SelectMany, in fact that's my ultimate solution now. I tried adding the NewThreadScheduler.Default just for a test, and that worked as well of course. I get the concept that the observables are effectively deadlocked on the OnNext, but I think i'll play around with Spy a bit and try to make this clearer for myself. Thanks again! – JohnKoz Sep 05 '17 at 01:58
1

Wait is a blocking calling, which doesn't mix well with Rx. I'm not sure why the nested one is failing.

Assuming an async function, this works:

IObservable<string> files = fileNames.ToObservable();
string[] extensions = await files.SelectMany(async fn =>
{
    var extension = await fn.ToObservable()
    .TakeLast(4)
    .ToArray();
        return new string(extension);
})
.ToArray();
Shlomo
  • 14,102
  • 3
  • 28
  • 43
1

James' has nailed the issue, but I would suggest that your code boils done to just doing this:

    string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
    string[] extensions =
    (
        from fn in fileNames.ToObservable()
        from extension in fn.ToObservable().TakeLast(4).ToArray()
        select new string(extension)
    )
        .ToArray()
        .Wait();

Now, that still has a .Wait() in it. Ideally you'd do something like this:

    IDisposable subscription =
    (
        from fn in fileNames.ToObservable()
        from extension in fn.ToObservable().TakeLast(4).ToArray()
        select new string(extension)
    )
        .ToArray()
        .Subscribe(extensions =>
        {
            /* Do something with the `extensions` */
        });

You should avoid all waiting.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172