0

I am using IAsyncEnumerable to return paginated results from an API as they come in, as follows:

public async IAsyncEnumerable<ApiObject> GetResults(){
  int startIndex = 0;
  PaginatedResponse<ApiObject> response;
  do {
    response = await _client.GetResults(_endpoint, startIndex, _pageSize);
    foreach (var obj in response.Items){
      yield return obj;
    }
    startIndex = response.StartIndex + response.Count;
  } while (response.StartIndex + response.Count < response.Total);
}

What I want is to send requests for multiple pages in parallel, and return each page to the IAsyncEnumerable as it comes in. Order does not matter.

Is what I'm attempting to do possible? Can I have multiple asynchronous Tasks writing to the same IAsyncEnumerable that my method is returning? The best I can come up with is to have the tasks write to a common IList, then after all tasks complete, iterate over that IList and return all elements:

public async Task<IAsyncEnumerable<ApiObject>> GetResults(){
  int totalPages = _client.MagicGetTotalPagesMethod();
  IList<ApiObject> results = new List<ApiObject>();
  var tasks = Enumerable.Range(0,totalPages).Select(async x => {
    // my goal is to somehow return from here, rather than aggregating into the IList and returning after all tasks are done
    results.AddRange(await _client.GetResults(_endpoint, x*_pageSize, _pageSize));
  });
  await Task.WhenAll(tasks);
  foreach (var result in results){
    yield return result;
  }
}

This partially solves the problem by having multiple requests out at once, but we still have to wait for all of them to return before the consumer of this GetResults can use the results.

ewok
  • 20,148
  • 51
  • 149
  • 254
  • 2
    What does "write to an enumerable" mean? – Caius Jard Nov 18 '21 at 16:58
  • 1
    @Servy my goal isn't to be synchronous. it's to be asynchronous. I send a batch of requests to the API all at the same time, then return each to the `IAsyncEnumerable` as it comes back from the API. the order of the results does not matter. – ewok Nov 18 '21 at 17:03
  • @CaiusJard perhaps my choice of words isn't the best, but I think my goal is clear from the question. What are you unclear about? – ewok Nov 18 '21 at 17:03
  • By the way it makes no sense to use `Select` to run a method that doesn't return a meaningful value and has a side effect of adding an item to a list. The whole point of `Select` is to return the actual value that you are projecting to. If you just want to loop through the items then use a foreach loop. If you want to use a `Select`, have the lambda just return the actual value you care about. – Servy Nov 18 '21 at 17:04
  • @CaiusJard Took me a while to figure that out too. I think it means "Have multiple child tasks running in parallel, and having all their results be the results of the IAsyncEnumerable." You can do this by running your tasks in parallel via WhenAny + MoveNextAsync. Whichever task finishes moving, you yield that task's result. – Raymond Chen Nov 18 '21 at 17:04
  • @Servy you're right. my mistake. updated – ewok Nov 18 '21 at 17:05
  • @RaymondChen that sounds like what I want. can you write that up as an answer so I can better understand the code? – ewok Nov 18 '21 at 17:06
  • @Servy as for using Select, this is a shorthand to create a set of parallel tasks. the actual codebase is more complex than this. As you mentioned, i had erroneously indicated that the tasks we not asynchronous, but i have fixed that now – ewok Nov 18 '21 at 17:07
  • I was just sketching it. It's not a full answer. You call MoveNextAsync on each child enumerator and then WaitAny on all of them. When one completes, you yield its result, and then call MoveNextAsync again. If MoveNextAsync says "no more", then remove it from your list. Stop when the list is empty. – Raymond Chen Nov 18 '21 at 17:08
  • 1
    @ewok There is no reason for a select that adds items to a list though. It's just unnecessary complexity. Just have the lambda return the value in question, the ienumerable returned by select is your sequence of values – Servy Nov 18 '21 at 17:08
  • 2
    Related: [How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results?](https://stackoverflow.com/questions/58194212/how-to-implement-an-efficient-wheneach-that-streams-an-iasyncenumerable-of-task) – Theodor Zoulias Nov 18 '21 at 17:29
  • 1
    @TheodorZoulias I think that solved _exactly_ my problem. Thanks!!! – ewok Nov 18 '21 at 17:37

1 Answers1

2

You can use the ability to order a sequence of tasks by completion to make such a method quite simple.

var tasks = Enumerable.Range(0,totalPages).Select(x =>
    client.GetResults(_endpoint, x*_pageSize, _pageSize));

foreach(var task in tasks.Order())
    foreach(var item in await task)
        yield return item;
Servy
  • 202,030
  • 26
  • 332
  • 449
  • will `tasks.Order()` not block on all tasks being completed? How can a collection of Tasks be ordered by completion time if all tasks have not yet been completed? The goal is to not block the parent method on all tasks being complete, but rather return each element from one task as soon as it competes, while simultaneously waiting on the rest – ewok Nov 18 '21 at 17:18
  • @ewok You can look at the code to see how it works. It's asynchronous, not synchronous, and therefore does not need to block on any of the tasks. – Servy Nov 18 '21 at 17:20