3

I have a console app written using C# on the top of Core .NET 2.2 framework.

My application allows me to trigger long-running admin jobs using Windows task scheduler.

One of the admin jobs makes a web-API call which download lots of files before it uploads them onto Azure Blob storage. Here are the logical steps that my code will need to performs to get the job done

  1. Call the remote API which response with Mime message where each message represents a file.
  2. Parse out the Mime messages and convert each message into a MemoryStream creating a collection of MemoryStream

Once I have a collection with multiple 1000+ MemoryStream, I want to write each Stream onto the Azure Blob Storage. Since the write to the remote storage is slow, I am hoping that I can execute each write iteration using its own process or thread. This will allow me to have potintially 1000+ thread running at the same time in parallel instead of having to wait the result of each writes operation. Each thread will be responsible for logging any errors that potentially occur during the write/upload process. Any logged errors will be dealt with using a different job so I don't have to worry about retrying.

My understanding is calling the code that writes/upload the stream asynchronously will do exactly that. In other words, I would say "there is a Stream execute it and run for as long as it takes. I don't really care about the result as long as the task gets completed."

While testing, I found out that my understanding of calling async is somewhat invalid. I was under the impression that when calling a method that is defined with async will get executed in the background thread/worker until that process is completed. But, my understanding failed when I tested the code. My code showed me that without adding the keyword await the async code is never really executed. At the same time, when the keyword await is added, the code will wait until the process finishes executing before it continues. In other words, adding await for my need will defeat the purpose of calling the method asynchronously.

Here is a stripped down version of my code for the sake of explaining what I am trying to accomplish

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

From the above code, calling await Upload(file); works and will upload the file as expected. However, since I am using await when calling the Upload() method, my loop will NOT jump to the next iteration until the upload code finishes. At the same time, removing the await keyword, the loop does not await the upload process, but the Stream never actually writes to the storage as if I never called the code.

How can I execute multiple Upload method in parallel so that I have one thread running per upload in the background?

Blue
  • 22,608
  • 7
  • 62
  • 92
Junior
  • 11,602
  • 27
  • 106
  • 212

5 Answers5

7

Convert the list to a list of "Upload" tasks, and await them all with Task.WhenAll():

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

See this post for some more information about tasks/await.

Blue
  • 22,608
  • 7
  • 62
  • 92
  • Thank you for this info. so what does `Task.WhenAll()` do in this case? Does it fire all tasks at once in parallel, does it batch Tasks into groups and dispatch groups in parallel or does it execute one after another in a pipe line? – Junior Feb 05 '19 at 02:18
  • 2
    @MikeA in short the task scheduler will decide how many threads to liberate from the thread pool to run concurrently, it uses a best guess based on certain factors and heuristics. In short it runs in parallel the best it can – TheGeneral Feb 05 '19 at 03:15
  • @MichaelRandall: To my knowledge what you described is `Parallel.ForEach`. And from my experience `Task.WhenAll` fires all tasks at once. Is it documented anywhere? – abatishchev Feb 05 '19 at 19:55
  • @abatishchev The tasks actually fire the second you run select on them. Your function however carries on, and continues. The `Task.WhenAll` is an async method that will return when 1 task has failed or all have finished successfully. If you await that, execution is "paused" there until the task is complete. – Blue Feb 05 '19 at 20:55
  • @abatishchev yeah it documented in task or thread pool or somewhere, it's just what the default scheduler does, however you can test it with a bunch of tasks and thread. Sleep, however with async and await and io bound work, it will wait for io completion ports to finish thier work threads will go back to the thread pool. However all in all the scheduler won't aggressively liberate threads from the thread pool and will take a best guess approach governed by factors such as Max threads, the type of work you are doing and heuristics. Stephan toub and Stephan cleary have blogs on it as well – TheGeneral Feb 05 '19 at 22:12
  • @FrankerZ: yeah, and that's my point. `await Task.WhenAll(Enumerable.Range(0, Math.Pow(2,10)).Select(_ => httpClient.GetAsync("https://google.com")))` won't schedule the workload gracefully. Will it? – abatishchev Feb 06 '19 at 02:29
  • @MichaelRandall: I've read both blogs. Can you provide more specific link? And please see my comment above. – abatishchev Feb 06 '19 at 02:30
  • @abatishchev i will admit, the relevant sources for how the default .net TaskScheduler works internally is not easily found and there is only glancing comments made about it in the documentation. Additionally the TPL source code is not for the faint of heart. However, if you want to dig far enough, You will find references to the internals of it written by the likes of Stehpen cleary, Stephen Toub, Eric Lippert, and more. There are also plenty of questions on the behaviour of it on SO (please note some of it is outdated) – TheGeneral Feb 06 '19 at 03:07
  • @abatishchev What i suggest is, if you want a more concrete answer of the behaviour of the Default Task Scheduler, is that that you ask a question on StackOverflow. Detailing why the TaskScheduler will divy out threadpool threads and IO threads in the way it does. I will be happy to collate all the sources i have, and reference the relevant (and some times not obvious) documentation i can find. I also notice Stephen Cleary is answering questions again and is active on Task questions. So you might be lucky there too – TheGeneral Feb 06 '19 at 03:07
4

I am hoping that I can execute each write iteration using its own process or thread.

This is not really the best way to do this. Processes and Threads are limited resources. Your limiting factor is waiting on the network to perform an action.

What you'll want to do is just something like:

var tasks = new List<Task>(queue.Count);

while (queue.Count > 0)
{
  var myobject = Queue.Dequeue();
  var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

Here we're just creating tasks as fast as we can and then wait for them all to complete. We'll just let the .Net framework take care of the rest.

The important thing here is that Threads don't improve the speed of waiting for network resources. Tasks are a way to delegate what needs to be done out of the threads hands so you have more threads to do whatever (like start up a new upload, or response to a finished upload). If the thread simply waits for the upload to complete, it's a wasted resource.

Erik Philips
  • 53,428
  • 11
  • 128
  • 150
  • This is why tasks are better suited for this (What the OP was attempting to do). Let the task scheduler determine what's best to optimize resources for the application/server. – Blue Feb 05 '19 at 03:32
3

You likely need this:

var tasks = files.Select(Upload);
await Task.WhenAll(tasks);

Just note that it'll spawn as many tasks as you have files what may bring the process/machine down if there will be too many of them. See Have a set of Tasks with only X running at a time as n example how to address that.

abatishchev
  • 98,240
  • 88
  • 296
  • 433
3

The other answers are fine, however another approach is to your TPL DataFlow available in Nuget from https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   //  Do async work here
}

The advantage of dataflow

  1. Is it naturally works with async as does WhenAll task based solutions
  2. it can also be plumbed in to a larger pipeline of tasks
    • You could retry errors by piping them back in.
    • Add any pre-processing calls into earlier blocks
  3. You can limit the MaxDegreeOfParallelism if throttling is a concern
  4. You can make more complicated pipelines, hence the Name of DataFlow
TheGeneral
  • 79,002
  • 9
  • 103
  • 141
0

You could convert your code to an Azure Function and have it let Azure handle most of the parallelism, scale-out and upload to Azure Blob Storage work.

You could use an Http Trigger or Service Bus trigger to initiate each download, process and upload task.

Ian Mercer
  • 38,490
  • 8
  • 97
  • 133