0

I have an IObservable that generates a value every second, followed by a select that runs code that may take some time:

var events = Observable.Interval(TimeSpan.FromSeconds(1));
ssoInfoObservable = events
    .Select(async e =>
    {
        Console.Out.WriteLine("Select   : " + e);
        await Task.Delay(4000);
        return e;
    })
    .SelectMany(t => t.ToObservable())
    .Subscribe(l => Console.WriteLine("Subscribe: " + l));

The long-running operation takes 4 seconds in my example. While the code inside Select is running, I do not want another value from the Interval to be generated. How do I accomplish this? Is this possible? Maybe use a specific IScheduler implementation?

Note that if there's no async code, everything works as expected as described here.

This question is very similar to one I asked earlier, except for the async/await.

Community
  • 1
  • 1
Ronald Wildenberg
  • 31,634
  • 14
  • 90
  • 133
  • The `.ObserveOn(...)` doesn't make a difference. The `.Interval(...)` operator times the gap (i.e. interval) between each set of handlers and not between each start time. – Enigmativity Jul 18 '15 at 11:47
  • I see that I have made my example code too simple, I have updated my question to reflect my actual code. – Ronald Wildenberg Jul 19 '15 at 09:53
  • This is still limited by the number of threads that actually run. I only every got four selects ahead and then it just stayed there. I'll have a think, but can you describe why this is your actual code? What problem do you think it solves for you? – Enigmativity Jul 19 '15 at 12:27
  • The most important requirement is that the `Select` doesn't run in parallel. The operation that's awaited can be long-running (waiting for a result to become available in a db) and produces the result for the rest of the chain. The simple solution would of course be to synchronously wait for the `Result` of the awaited task. Actual code is quite similar, except I await something useful. – Ronald Wildenberg Jul 20 '15 at 06:33
  • Why are you mixing Observables as `await` like this? Observables achieve the purpose of pushing your work to a background thread thus freeing up the UI (or main thread) to be responsive for other code. Mixing in `await` is kind of like "double-dipping". – Enigmativity Jul 20 '15 at 07:29
  • Rx naturally ensure the `Select` will run is series if you don't introduce concurrency. – Enigmativity Jul 20 '15 at 07:30
  • I guess you're right :) Was wondering the same thing after giving it some more thought.The reason for using the `await` is simply that the API I'm calling only offers async methods. So I naturally started with `await`, discovered the undesired behavior and tried to find a solution. I'm now going to wait for the result synchronously. It's more an academic excercise now to try and find a solution that allows the use of `await` (without using global state because that would be too easy :) – Ronald Wildenberg Jul 20 '15 at 08:45
  • I think you need to introduce some sort of state. Rx does a nice job of **not** multi-threading unless you construct your query to introduce it. But once you have it is much harder to remove it the concurrency. – Enigmativity Jul 20 '15 at 09:07

1 Answers1

1

See this sample on creating an async generate function. Your would be slightly different in that you need a time offset and you don't need the iterate so it would look more like this:

    public static IObservable<T> GenerateAsync<T>(TimeSpan span, 
                                                  Func<int, Task<T>> generator, 
                                                  IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;

        return Observable.Create<T>(obs =>
        {
            return scheduler.Schedule(0, span, async (idx, recurse) =>
            {
                obs.OnNext(await generator(idx));
                recurse(idx + 1, span);
            });

        });
    }

Usage:

  Extensions.GenerateAsync(TimeSpan.FromSeconds(1), idx => /*Async work, return a task*/, scheduler);

As a possible second option you could look to port the implementation of switchFirst into C#. SwitchFirst will subscribe to the first Observable it receives and ignore subsequent ones until its current subscription completes.

If you took that approach you could have something like:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(e => Observable.FromAsync(() => /*Do Async stuff*/)
          .SwitchFirst()
          .Subscribe();
Community
  • 1
  • 1
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55