0

I want write function which will return items asynchronously:

public static async Task<IEnumerable<T>> GetItems()
{
    while (await ShouldLoopAsync())
    {
        yield return await GetItemAsync();
    }
}

Of course it will not work. In 1 they propose next solution:

return Observable.Create<T>(
    observer => Task.Run(async () =>
        {
            while (await ShouldLoopAsync())
            {
                observer.OnNext(await getAsync());
            }
            observer.OnCompleted();
        }
    )
);

I don't like accepted solution from 1 because it uses Task.Run method inside Observable.Create. I want some solution with reactive extensions which will run on the current TaskScheduler without using other threads except existing one. I can write such solution in declarative way using .net events but I can't with Rx.

UPDATE

I tried to remove Task.Run and write next test:

[TestClass]
public class ReactiveWithTaskIEnumerable
{
    [TestMethod]
    public void ReactibeShouldProcessValuesWithoutnakoplenie()
    {
        Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
        var i = GetIntsAsync()
            .Subscribe(
            onNext: p =>
            {
                Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
            },
            onCompleted: () =>
            {
                Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
            },
            onError: p =>
            {
                Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
                throw p;
            }
            );
        Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");

    }

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer => async () =>
            {
                Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                try
                {
                    foreach (var i in Enumerable.Range(1, 10))
                    {
                        await Task.Delay(1000);
                        observer.OnNext(1);
                    }
                }
                catch (Exception e)
                {
                    observer.OnError(e);
                }
                observer.OnCompleted();
            });
    }
}

And the output is:

Start Thread 3
Finish Thread 3

So internal task doesn't start at all.

After that I changed GetIntsAsync to this:

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer =>
            {
                async Task Lambda()
                {
                    Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                    try
                    {
                        foreach (var i in Enumerable.Range(1, 10))
                        {
                            await Task.Delay(1000);
                            observer.OnNext(1);
                        }
                    }
                    catch (Exception e)
                    {
                        observer.OnError(e);
                    }
                    observer.OnCompleted();
                }

                return Lambda();
            });
    }

And this gives next output:

Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3

It is close to output with Task.Run:

Debug Trace: Start Thread 3 Produce 9 Finish Thread 3

Now I need to wait until observable comes completion

Radik Kurbanov
  • 300
  • 2
  • 6
  • 2
    So remove the `Task.Run` that you don't want to be there. What's the problem? – Servy Oct 11 '18 at 15:35
  • 2
    your problem with your unit test where you remove Task.Run() is that you are not making your unit test wait for the async process to complete. Function runs, queues up async work, then exits completing the test before the async work can run! Change your unit test to be async and return a Task which only completes when the subscription OnCompleted or OnError callback is finished, – Brandon Oct 11 '18 at 20:39
  • You can also specify a Scheduler when calling Observable.Create – yan yankelevich Oct 24 '18 at 08:32
  • @yanyankelevich I can't find an `Observable.Create` overload that accepts an `IScheduler`, in the package System.Reactive 4.4.1. – Theodor Zoulias Nov 29 '20 at 11:53

1 Answers1

2

Have a go at NuGetting "System.Interactive" and then try this:

public static IEnumerable<int> GetItems()
{
    return EnumerableEx.Create<int>(async y =>
    {
        while (await ShouldLoopAsync())
        {
            await y.Return(GetItemAsync());
        }
        await y.Break();
    });
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172