0

I have the following code in my BackgroundService:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    await GetElements(cancellationToken)
        .ToObservable()
        .Buffer(TimeSpan.FromMinutes(1), 100)
        .Select(elements => Observable.FromAsync(() => ProcessElementsAsync(
            elements, cancellationToken)))
        .Concat()
        .ToTask(cancellationToken);
}

private async IAsyncEnumerable<Element> GetElements(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)  // <-- Breakpoint is here
    {
        yield return await _queue.DequeueElementAsync(cancellationToken);
    }
}

When I run the app, the breakpoint is never hit. It looks as if the GetElements is never actually executed.

When I replace ExecuteAsync with this code, the breakpoint in GetElements does get hit:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    GetElements(cancellationToken)
        .ToObservable()
        .Subscribe(val =>
        {
            _ = 1;
        });

    await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}

What's wrong with the first implementation?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
mnj
  • 2,539
  • 3
  • 29
  • 58
  • Related question for context: [How to buffer a queue with a timeout?](https://stackoverflow.com/questions/75873568/how-to-buffer-a-queue-with-a-timeout) – Theodor Zoulias Mar 31 '23 at 07:47
  • I haven't used `Buffer` much but I wouldn't be surprised if that affects you not reaching the breakpoint. However more likely I think is that your Select statement is problematic. Can you use `SelectMany(this IObservable, Func>` instead? – Stroniax Apr 27 '23 at 18:21

0 Answers0