2

I have a list of objects that I need to iterate over parallely. This is what I need to do:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Since I want to parellelize it, I tried to use Parallel.ForEach() but it doesn't wait until everything is really complete since apiHelper.Get() is doing an await inside of its own.

Parallel.ForEach(
                results,
                async (r) =>
                {
                    r.SomeList = await apiHelper.Get(r.Id);
                });

So I searched online and found this: Nesting await in Parallel.ForEach

Now I am very new to TPL (20 mins old) and I may be missing something obvious. How do I go ahead?

        var getBlock = new TransformBlock<string, List<Something>>(
            async i =>
            {
                var c = await apiHelper.Get(i);
                return c;
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        foreach (var r in results)
        {
            r.SomeList = getBlock.Post(r.Id);  // ERROR: Can't convert boolean to list.
        }

        getBlock.Complete();
spender
  • 117,338
  • 33
  • 229
  • 351
90abyss
  • 7,037
  • 19
  • 63
  • 94

3 Answers3

3

Perhaps consider using Microsoft's Reactive Framework instead.

Here's the code:

var query =
    from r in results.ToObservable()
    from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
    select new { r, l };

query
    .Subscribe(x => x.r.SomeList = x.l);

Done. Parallel and async.

Just NuGet "System.Reactive" and add a using System.Reactiive.Linq;.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • I have been meaning to learn me some reactive, every time i go to the site the page puts me off though, +1 for learning me something – TheGeneral May 24 '18 at 00:29
2

Here is an a example using ActionBlock Class in the TPL dataflow library.

It basically give you parallel, and async and its fairly easy to understand

Dataflow example

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   result.SomeList = await apiHelper.Get(result.Id);
}

Obviously, you will need error checking, and add pepper and salt to taste

Also, its assuming apiHelper is thread safe

TheGeneral
  • 79,002
  • 9
  • 103
  • 141
0

To call an api asyncrounously and in parallel you don't need Reactive or Dataflow. The only complication with what you have is that you're mutating the object r by setting it's property with the result of the api call. Still though, what you want is pretty simple:

This:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Becomes:

var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);

Assuming apiHelper.Get is in fact non-blocking and async then each item within results will have an asynchronous and parallel call to the api.

JSteward
  • 6,833
  • 2
  • 21
  • 30