0

I have a method which loads some rows from a database, then performs an operation on every row.

Synchronously, it would look like this:

void DoSomething()
{
    using( IWebService web = new WebService( ... ) )
    using( IDatabaseService db = new DatabaseService( ... ) )
    {
        List<Entity> rows = db.GetRows( ... );

        foreach( Entity row in rows )
        {
            RelatedInfo info = web.GetRelatedInfo( row.Foo );
            web.MakeAnotherServiceCall( row.Bar );
            db.UpdateEntity( row.Id, info );
        }
    }
}

My async version looks like this (ConfigureAwait omitted for brevity):

async Task DoSomething()
{
    using( IWebService web = new WebService( ... ) )
    using( IDatabaseService db = new DatabaseService( ... ) )
    {
        List<Entity> rows = await db.GetRows( ... );

        List<Task> tasks = new List<Task>();
        foreach( Entity row in rows )
        {
            Entity localRow = row; // new alias to prevent capturing the foreach object in the closure
            Task task = new Task( async() =>
            {
                RelatedInfo info = await web.GetRelatedInfo( localRow.Foo );
                await web.MakeAnotherServiceCall( localRow.Bar );
                await db.UpdateEntity( localRow.Id, info );
            } );
            tasks.Add( task );
        }
        await Task.WhenAll( tasks );
    }
}

(For unrelated reasons I'm unable to test this right now, but I should be able to test it in a few days).

Notice how my per-row operation creates a new Task which is eventually awaited (inside Task.WhenAll).

Remember that I'm not wanting to start any background/pool threads (if I wanted that I would just use Task.Run). Instead I'm wanting all of these per-row work-items to run asynchronously on the same thread... but in my example above, do they?

The examples I've seen on StackOverflow so far, such as Asynchronous foreach - use the same pattern as me, except they use a separate class-level method (or they cop-out using Task.Run) - they don't use a lambda inside an explicit Task constructor:

async Task DoSomething()
{
    using( IWebService web = new WebService( ... ) )
    using( IDatabaseService db = new DatabaseService( ... ) )
    {
        List<Entity> rows = await db.GetRows( ... );

        List<Task> tasks = new List<Task>();
        foreach( Entity row in rows )
        {
            Task t = DoRow( row, web, db );
            tasks.Add( t );
        }
        Task.WhenAll( tasks );
    }
}

private static async Task DoRow(Entity row, IWebService web, IDatabaseService db)
{
    RelatedInfo info = await web.GetRelatedInfo( localRow.Foo );
    await web.MakeAnotherServiceCall( localRow.Bar );
    await db.UpdateEntity( localRow.Id, info );
}

My understanding so-far of Task and lambdas is that my lambda-based code should have the same asynchronous semantics as the separate-method code above - but I'm uncertain.

Update:

Perhaps a better way of phrasing my question is using continuations, which await abstracts away:

I understand this is what I'm wanting to achieve, but using the await keyword so I don't have to use ContinueWith manually, and without using Task.Run:

List<Task> tasks = new List<Task>();
foreach( Entity row in rows )
{
    Entity localRow = row;
    Task t = web
        .GetRelatedInfo( localRow.Foo )
        .ContinueWith( t1 => new { RelatedInfo = t1.Result, WebCall2 = web.MakeAnotherServiceCall( localRow.Bar ) } )
        .ContinueWith( t2 => db.UpdateEntity( localRow.Id, t2.RelatedInfo ) );
    tasks.Add( t );
}
Task.WhenAll( tasks );
Dai
  • 141,631
  • 28
  • 261
  • 374
  • 1
    If you don't start the task, it will never execute. There's no reason to create cold tasks in the first place. The tasks returned from asynchronous methods are *already* "running" – Panagiotis Kanavos Jul 18 '17 at 08:29
  • @PanagiotisKanavos But in my last example (using a separate class-level method) I never explicitly start the task either - yet that code works. – Dai Jul 18 '17 at 08:31
  • Because you are using asynchronous methods, not starting or creating background tasks. These tasks represent the execution of that asynchronous method. They are *already* running when you get them from ,eg `dbContext.ToListAsync()` or `httpClient.GetStringAsync()` – Panagiotis Kanavos Jul 18 '17 at 08:32
  • @RandRandom `Task.Run` schedules new work on a background thread from the thread-pool, which seems wasteful - but asynchronous code does not need to execute work on another thread ( https://stackoverflow.com/questions/37419572/if-async-await-doesnt-create-any-additional-threads-then-how-does-it-make-appl ) - I'm wanting to do that but in a loop. – Dai Jul 18 '17 at 08:35
  • btw the "async" version is far too convoluted. If you want to await some async methods, just collect all the returned tasks in an array/list. You don't need that `new Task()` at all. You don't even need the list. You can write `var tasks = rows.Select(async row=> { var info=await GetRelatedInfo(...);...}); await Task.WhenAll(tasks);` – Panagiotis Kanavos Jul 18 '17 at 08:35
  • you don't neet to *create* any tasks to make a bunch of async web calls. `DoSomething` is already an async method. You can use `await` inside the loop. – Panagiotis Kanavos Jul 18 '17 at 08:37
  • BTW the remote service may not be able to handle so many concurrent requests. Nor does it mean that executing 100 concurrent requests is going to be faster. Too many concurrent requests can lead to blocking issues. Each call incurs a lot of networking overhead too. It would be faster if the service accepted an *array* of rows, eg 10-100 at a time – Panagiotis Kanavos Jul 18 '17 at 08:41
  • @PanagiotisKanavos You misunderstand my question. If you use `await` in a loop then it will cause the loop to *effectively* run synchronously. Please see my update using `ContinueWith`. – Dai Jul 18 '17 at 08:42
  • no it won't. You confuse sequentially with synchronously. `await` in a loop *releases* the current thread while *awaiting* for that async operation to complete. Nothing blocks. You don't need `ContinuteWith()` either. `await` was created to *replace* all those continuations. `await` *is* essentially a `continutewith` that runs on the original synchronization context – Panagiotis Kanavos Jul 18 '17 at 08:46
  • as for how you can send all requests at once, I already posted a LINQ query that does just that. `var tasks = rows.Select(async row=> { var info=await GetRelatedInfo(...);...}); await Task.WhenAll(tasks);`. Pull the loop's code in a separate function, or even a local function if you want to capture local variables to simplify the call to `await Task.WhenAll(rows.Select(row=>MyAsyncFunc(row));` or `await Task.WhenAll(rows=.Select(MyAsyncFunc));` – Panagiotis Kanavos Jul 18 '17 at 08:48
  • @PanagiotisKanavos My apologies. I used the wrong terminology. I did indeed mean to say Sequentially, not Synchronously - however my point remains: I want the per-row operations to be executed asynchronously as coroutines. – Dai Jul 18 '17 at 08:49
  • @PanagiotisKanavos Your Linq example doesn't demonstrate how method calls that are dependent on previous tasks (for the same `row`) are made. – Dai Jul 18 '17 at 08:50
  • it does. The `Select(async =>{....})` lambda can contain multiple `await` statements. The same as if it were a separate method – Panagiotis Kanavos Jul 18 '17 at 09:00

2 Answers2

3

If you want to execute multiple asynchronous requests based on some input data and await the results, you can use a LINQ expression:

var tasks = rows.Select(async row= >{
                var info = await web.GetRelatedInfo( row.Foo );
                await web.MakeAnotherServiceCall( row.Bar );
                await db.UpdateEntity( row.Id, info );
            });

await Task.WhenAll(tasks);

This will execute the asynchronous lambda once for each row, concurrently. Selectis an Enumerable, which means thatSelectisn't executed until the enumerable is iterated. This can be forced withTask.WhenAllor a call toToList(),ToArray()`, etc

Unfortunately, this will almost certainly fail - the remote client may not be able to accept 100 requests at the same time. Worse, await db.UpdateEntity will probably fail if db is an EF DbContext.

A DbContext isn't thread safe, even though it provides asynchronous methods. SaveChangesAsync will send all cached modifications to the database at the time it's called. If multiple threads try to modify data and call SaveChanges(Async), the first call could try to persist half the changes being made by the second thread.

The solution is to separate the service requests from the database modifications. This can be done with the TPL Dataflow classes.

  • The first block/step can read all data from the database and post them to the next step.
  • The second step calls the web service for each row and sends the results to the next step.
  • The last step sends the results to the database using a different connection.

For example, the following pipeline uses a DOP of 10 to execute up to 10 web requests in parallel. The service block accepts up to 30 rows in its input buffer, to prevent the reader block from flooding memory if the web requests are slow. If the input buffer fills, the reader block will await before propagating any more results.

//Service proxies are typically thread safe
var web = new WebService( ... );

var readerBlock = new TransformManyBlock<MyFilterDTO,Entity>(async filter =>
{
        using( IDatabaseService db = new DatabaseService( ... ) )
        {
            return await db.GetRows(filter.A,filter.B);
        }
});

var serviceOptions = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 10, 
                        BoundedCapacity=30
                     };

var serviceBlock = new TransformBlock<Entity,(int id,RelatedInfo info)>(async row=>
{
    var info = await web.GetRelatedInfo( row.Foo );
    await web.MakeAnotherServiceCall( row.Bar );
    return (row.Id,info);
},serviceOptions);

var updateBlock = new ActionBlock<(int id,RelatedInfo info)>(asyn result =>
{
    using( IDatabaseService db = new DatabaseService( ... ) )
    {
        await db.UpdateEntity( result.id, result.info );
    }

});

var linkOptions=new DataflowLinkOptions{PropagateCompletion=true};
readerBlock.LinkTo(serviceBlock,linkOptions);
serviceBlock.LinkTo(updateBlock,linkOptions);

//Start sending queries
readerBlock.Post(someFilterValue);
...
//We are finished, close down the pipeline
readerBlock.Complete();

try
{
    //Await until all blocks finish
    await updateBlock.Completion;
}
finally
{
    web.Dispose();
}

The update block stores one entity at a time. An optimization would be to batch multiple results before sending them to the database. This can be done using the BatchBlock<T>(int batchsize) which converts individual messages into arrays, eg :

var batchBlock = new BatchBlock<(int id,RelatedInfo info>(10);

var updateBlock = new ActionBlock<(int id,RelatedInfo info)[]>(asyn results =>
{
    using( IDatabaseService db = new DatabaseService( ... ) )
    {
        foreach(var result in results)
        {
            await db.UpdateEntity( result.id, result.info );
        }
    }
});



var linkOptions=new DataflowLinkOptions{PropagateCompletion=true};
readerBlock.LinkTo(serviceBlock,linkOptions);
serviceBlock.LinkTo(batchBlock,linkOptions);
batchBlock.LinkTo(updateBlock,linkOptions);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
1

Never use the Task constructor.

If your synchronous version is this:

void DoSomething()
{
  using( IWebService web = new WebService( ... ) )
  using( IDatabaseService db = new DatabaseService( ... ) )
  {
    List<Entity> rows = db.GetRows( ... );

    foreach( Entity row in rows )
    {
      RelatedInfo info = web.GetRelatedInfo( row.Foo );
      web.MakeAnotherServiceCall( row.Bar );
      db.UpdateEntity( row.Id, info );
    }
  }
}

then the equivalent asynchronous version would look like this:

async Task DoSomethingAsync()
{
  using( IWebService web = new WebService( ... ) )
  using( IDatabaseService db = new DatabaseService( ... ) )
  {
    List<Entity> rows = await db.GetRowsAsync( ... );

    foreach( Entity row in rows )
    {
      RelatedInfo info = await web.GetRelatedInfoAsync( row.Foo );
      await web.MakeAnotherServiceCallAsync( row.Bar );
      await db.UpdateEntityAsync( row.Id, info );
    }
  }
}

Always do this kind of straightforward conversion first. Once it's there, then you can consider enhancements like asynchronous concurrency:

async Task DoSomethingAsync()
{
  using( IWebService web = new WebService( ... ) )
  using( IDatabaseService db = new DatabaseService( ... ) )
  {
    List<Entity> rows = await db.GetRowsAsync( ... );

    var tasks = rows.Select(async row =>
    {
      RelatedInfo info = await web.GetRelatedInfoAsync( row.Foo );
      await web.MakeAnotherServiceCallAsync( row.Bar );
      return info;
    });
    var infos = await Task.WhenAll(tasks);
    for ( int i = 0; i != rows.Count; ++i )
      await db.UpdateEntityAsync( rows[i].Id, infos[i] );
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810