I want write function which will return items asynchronously:
public static async Task<IEnumerable<T>> GetItems()
{
while (await ShouldLoopAsync())
{
yield return await GetItemAsync();
}
}
Of course it will not work. In 1 they propose next solution:
return Observable.Create<T>(
observer => Task.Run(async () =>
{
while (await ShouldLoopAsync())
{
observer.OnNext(await getAsync());
}
observer.OnCompleted();
}
)
);
I don't like accepted solution from 1 because it uses Task.Run method inside Observable.Create. I want some solution with reactive extensions which will run on the current TaskScheduler without using other threads except existing one. I can write such solution in declarative way using .net events but I can't with Rx.
UPDATE
I tried to remove Task.Run
and write next test:
[TestClass]
public class ReactiveWithTaskIEnumerable
{
[TestMethod]
public void ReactibeShouldProcessValuesWithoutnakoplenie()
{
Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
var i = GetIntsAsync()
.Subscribe(
onNext: p =>
{
Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
},
onCompleted: () =>
{
Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
},
onError: p =>
{
Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
throw p;
}
);
Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");
}
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer => async () =>
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
});
}
}
And the output is:
Start Thread 3
Finish Thread 3
So internal task doesn't start at all.
After that I changed GetIntsAsync
to this:
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer =>
{
async Task Lambda()
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
}
return Lambda();
});
}
And this gives next output:
Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3
It is close to output with Task.Run
:
Debug Trace: Start Thread 3 Produce 9 Finish Thread 3
Now I need to wait until observable comes completion