2

I'm new to tasks and have a question regarding the usage. Does the Task.Factory fire for all items in the foreach loop or block at the 'await' basically making the program single threaded? If I am thinking about this correctly, the foreach loop starts all the tasks and the .GetAwaiter().GetResult(); is blocking the main thread until the last task is complete.

Also, I'm just wanting some anonymous tasks to load the data. Would this be a correct implementation? I'm not referring to exception handling as this is simply an example.

To provide clarity, I am loading data into a database from an outside API. This one is using the FRED database. (https://fred.stlouisfed.org/), but I have several I will hit to complete the entire transfer (maybe 200k data points). Once they are done I update the tables, refresh market calculations, etc. Some of it is real time and some of it is End-of-day. I would also like to say, I currently have everything working in docker, but have been working to update the code using tasks to improve execution.

class Program
{
    private async Task SQLBulkLoader() 
    {

        foreach (var fileListObj in indicators.file_list)
        {
            await Task.Factory.StartNew(  () =>
            {

                string json = this.GET(//API call);

                SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

                DataTable dataTableConversion = ConvertToDataTable(obj.observations);
                dataTableConversion.TableName = fileListObj.series_id;

                using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
                {
                    dbConnection.Open();
                    using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                    {
                      s.DestinationTableName = dataTableConversion.TableName;
                      foreach (var column in dataTableConversion.Columns)
                          s.ColumnMappings.Add(column.ToString(), column.ToString());
                      s.WriteToServer(dataTableConversion);
                    }

                  Console.WriteLine("File: {0} Complete", fileListObj.series_id);
                }
             });
        }            
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}
Peter Bons
  • 26,826
  • 4
  • 50
  • 74
  • It depends on the work that needs to be done. If it is CPU bound use `Parallel`, but if you intent to load files using native async methods use `Task.WhenAll`. Can you post the code of `//Doing stuff`? – Peter Bons Dec 30 '18 at 21:59
  • Code added. From your comment, I am thinking that I should use a task list. I saw this in the microsoft docs, but thought 'await' was a newer method to do the same thing. Also, thank you everyone for the help. This is great. –  Dec 30 '18 at 22:41
  • Now this is why it is important to post the code inside the loop. Given that you have no added the actual code inside the loop I think you should use the native task based methods available for the I/O operations, see my answer. – Peter Bons Dec 31 '18 at 10:09

6 Answers6

2

Your awaiting the task returned from Task.Factory.StartNew does make it effectively single threaded. You can see a simple demonstration of this with this short LinqPad example:

for (var i = 0; i < 3; i++)
{
    var index = i;
    $"{index} inline".Dump();
    await Task.Run(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    });
}

Here we wait less as we progress through the loop. The output is:

0 inline
0 in thread
1 inline
1 in thread
2 inline
2 in thread

If you remove the await in front of StartNew you'll see it runs in parallel. As others have mentioned, you can certainly use Parallel.ForEach, but for a demonstration of doing it a bit more manually, you can consider a solution like this:

var tasks = new List<Task>();

for (var i = 0; i < 3; i++) 
{
    var index = i;
    $"{index} inline".Dump();
    tasks.Add(Task.Factory.StartNew(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    }));
}

Task.WaitAll(tasks.ToArray());

Notice now how the result is:

0 inline
1 inline
2 inline
2 in thread
1 in thread
0 in thread

Kirk Woll
  • 76,112
  • 22
  • 180
  • 195
1

You'll want to add each task to a collection and then use Task.WhenAll to await all of the tasks in that collection:

private async Task SQLBulkLoader() 
{ 
  var tasks = new List<Task>();
  foreach (var fileListObj in indicators.file_list)
  {
    tasks.Add(Task.Factory.StartNew( () => { //Doing Stuff }));
  }

  await Task.WhenAll(tasks.ToArray());
}
devNull
  • 3,849
  • 1
  • 16
  • 16
1

This is a typical problem that C# 8.0 Async Streams are going to solve very soon.

Until C# 8.0 is released, you can use the AsyncEnumarator library:

using System.Collections.Async;

class Program
{
    private async Task SQLBulkLoader() {

        await indicators.file_list.ParallelForEachAsync(async fileListObj =>
        {
            ...
            await s.WriteToServerAsync(dataTableConversion);
            ...
        },
        maxDegreeOfParalellism: 3,
        cancellationToken: default);
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

I do not recommend using Parallel.ForEach and Task.WhenAll as those functions are not designed for asynchronous streaming.

Serge Semenov
  • 9,232
  • 3
  • 23
  • 24
  • Good point, like I said above I found this in the docs and though it was what I was looking for, but clearly I have errors. Thanks for the links. –  Dec 30 '18 at 22:53
  • Would the correct method be ParallelForEachAsync()? –  Dec 31 '18 at 02:13
  • @NorthVanHooser, yeah, you can use `ParrallelForEachAsync` if you need to run multiple tasks in parallel with the desired concurrency. – Serge Semenov Dec 31 '18 at 02:22
  • The http calls and sql server calls can be native async calls, so I do not get your recommendation to not use `Task.WhenAll` in this case. Can you explain, I thought it would be better to use the framework provided Task based methods? – Peter Bons Dec 31 '18 at 08:08
  • By the way, I think your answer is a good one, but I am curious about how you would approach the `//Doing stuff` given that the full code is now available. I wonder how you would implement it without using Task.Run, or Task.WhenAll – Peter Bons Dec 31 '18 at 08:20
  • @PeterBons, the `Task.WhenAll` takes in all tasks that you run in parallel. If you need to run an operation over 1,000 items, it would be very inefficient to do all of them in parallel due to excessive context switching and possible throttling on a service side. I.e. with `Task.WhenAll` you don't have an ability to control the degree of parallelism like "3 items in parallel at any given point of time". – Serge Semenov Jan 01 '19 at 06:23
  • @PeterBons, having the full code available in the question, my answer still remains the same. You can eaither use `ForEachAsync` for sequential processing or `ParallelForEachAsync` for concurrent processing. In both cases the body of a loop is an `async` labmda with support of `await` statements. Updated the answer to be more clear. – Serge Semenov Jan 01 '19 at 06:24
1

My take on this: most time consuming operations will be getting the data using a GET operation and the actual call to WriteToServer using SqlBulkCopy. If you take a look at that class you will see that there is a native async method WriteToServerAsync method (docs here) . Always use those before creating Tasks yourself using Task.Run.

The same applies to the http GET call. You can use the native HttpClient.GetAsync (docs here) for that.

Doing that you can rewrite your code to this:

private async Task ProcessFileAsync(string series_id)
{
    string json = await GetAsync();

    SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

    DataTable dataTableConversion = ConvertToDataTable(obj.observations);
    dataTableConversion.TableName = series_id;

    using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
    {
        dbConnection.Open();
        using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
        {
            s.DestinationTableName = dataTableConversion.TableName;
            foreach (var column in dataTableConversion.Columns)
                s.ColumnMappings.Add(column.ToString(), column.ToString());
            await s.WriteToServerAsync(dataTableConversion);
        }

        Console.WriteLine("File: {0} Complete", series_id);
    }
}

private async Task SQLBulkLoaderAsync()
{
    var tasks = indicators.file_list.Select(f => ProcessFileAsync(f.series_id));
    await Task.WhenAll(tasks);
}

Both operations (http call and sql server call) are I/O calls. Using the native async/await pattern there won't even be a thread created or used, see this question for a more in-depth explanation. That is why for IO bound operations you should never have to use Task.Run (or Task.Factory.StartNew. But do mind that Task.Run is the recommended approach).

Sidenote: if you are using HttpClient in a loop, please read this about how to correctly use it.

If you need to limit the number of parallel actions you could also use TPL Dataflow as it plays very nice with Task based IO bound operations. The SQLBulkLoaderAsyncshould then be modified to (leaving the ProcessFileAsync method from earlier this answer intact):

private async Task SQLBulkLoaderAsync()
{
    var ab = new ActionBlock<string>(ProcessFileAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    foreach (var file in indicators.file_list)
    {
        ab.Post(file.series_id);
    }

    ab.Complete();
    await ab.Completion;
}
Peter Bons
  • 26,826
  • 4
  • 50
  • 74
  • Using Serge's method I could only get the application to write synchronously or at least there was a warning on the lambda. I currently have the app using a task List, but I'll take a look at this example during the week and implement it. While I think I have everything setup quite nicely for this part, I really like learning about the async usage and will continue to experiment with it. Also, I have read the HttpClient post you mentioned a while back when making another console app. Its great context on how the requests work and should be required reading for everyone using the HttpClient. –  Dec 31 '18 at 18:53
0

Why didnt you try this :), this program will not start parallel tasks (in a foreach), it will be blocking but the logic in task will be done in separate thread from threadpool (only one at the time, but the main thread will be blocked).

The proper approach in your situation is to use Paraller.ForEach How can I convert this foreach code to Parallel.ForEach?

Paweł Górszczak
  • 524
  • 1
  • 5
  • 12
  • using `Parallel` is good for CPU bound work but for I/O Tasks should be used, as per answer of Kirk Woll – Peter Bons Dec 30 '18 at 21:56
  • Is there a concrete reason, why you are telling that in this situation Tasks should be used? Both solutions will work the same, both will use threadpool. Using Parallel is faster that manually creating starting tasks and waiting for them, i mean it is faster to write it, and i probably performance is same. – Paweł Górszczak Dec 30 '18 at 22:51
  • Well, there is no need to manually creating starting tasks since the operations are IO bound and there are native Task based methods that can be used, see my answer – Peter Bons Dec 31 '18 at 08:16
0

Use a Parallel.ForEach loop to enable data parallelism over any System.Collections.Generic.IEnumerable<T> source.

// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
    Parallel.ForEach(fileList, (currentFile) => 
    {

       //Doing Stuff

      Console.WriteLine("Processing {0} on thread {1}", currentFile, Thread.CurrentThread.ManagedThreadId);
    });
valerysntx
  • 506
  • 3
  • 7