2

I am describing my problem in a simple example and then describing a more close problem.

Imagine We Have n items [i1,i2,i3,i4,...,in] in the box1 and we have a box2 that can handle m items to do them (m is usually much less than n) . The time required for each item is different. I want to always have doing m job items until all items are proceeded.

A much more close problem is that for example you have a list1 of n strings (URL addresses) of files and we want to have a system to have m files downloading concurrently (for example via httpclient.getAsync() method). Whenever downloading of one of m items finishes, another remaining item from list1 must be substituted as soon as possible and this must be countinued until all of List1 items proceeded. (number of n and m are specified by users input at runtime)

How this can be done?

X X
  • 49
  • 1
  • 8

4 Answers4

6

You should look in to TPL Dataflow, add the System.Threading.Tasks.Dataflow NuGet package to your project then what you want is as simple as

private static HttpClient _client = new HttpClient();
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                                                  int concurrentDownloads)
{
    var result = new List<MyClass>();

    var downloadData = new TransformBlock<string, string>(async uri =>
    {
        return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method.
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});

    var processData = new TransformBlock<string, MyClass>(
          json => JsonConvert.DeserializeObject<MyClass>(json), 
          new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});

    var collectData = new ActionBlock<MyClass>(
          data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time.

    //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item.
    downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true});
    processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true});

    //Load the data in to the first transform block to start off the process.
    foreach (var uri in uris)
    {
        await downloadData.SendAsync(uri).ConfigureAwait(false);
    }
    downloadData.Complete(); //Signal you are done adding data.

    //Wait for the last object to be added to the list.
    await collectData.Completion.ConfigureAwait(false);

    return result;
}

In the above code only concurrentDownloads number of HttpClients will be active at any given time, unlimited threads will be processing the received strings and turning them in to objects, and a single thread will be taking those objects and adding them to a list.

UPDATE: here is a simplified example that only does what you asked for in the question

private static HttpClient _client = new HttpClient();
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads)
{
    var downloadData = new ActionBlock<string>(async uri =>
    {
        var response = await _client.GetAsync(uri); //GetAsync is a thread safe method.
        //do something with response here.
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});


    foreach (var uri in uris)
    {
       downloadData.Post(uri);
    }
    downloadData.Complete();

    downloadData.Completion.Wait();
}
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431
  • Thanks bro. I heard and saw a lot about TPL or Reactive Extension in the search of answer for my problem, but Its some how complicated for me and don't get how to use them. Isn't there a simpler solution to do this really? :) – X X Sep 23 '17 at 19:19
  • Dataflow it pretty easy to do once you realize you are just setting up steps in a pipeline. I made my example overly complicated so I could show you all the features of TPL DataFlow, I have updated with a example of just your requirements. – Scott Chamberlain Sep 23 '17 at 19:29
  • `HttpClient` is designed to be reused for multiple requests, even concurrent. Create one instance and use that, don't create a new one every time – Jakub Dąbek Sep 23 '17 at 19:32
  • I can never remember if it is HttpClient or WebClient that is that way, answer is fixed. – Scott Chamberlain Sep 23 '17 at 19:33
  • Thanks. Seems that I Should make myself familiar with TPL dataflow and don't escape from him :) – X X Sep 23 '17 at 19:39
  • Sorry, Would you help me a little more? I found that this code post all items in the proccessing queue at once. What if I have hundred thousands urls? – X X Sep 24 '17 at 05:21
  • In the ExecutionDataflowBlockOptions you can set a BoundedCapacity to set a max size of items to buffer and wait to be processed, if you hit the limit the call to `Post` will block, and the call to `SendAsync` will return a task that is not in the complete state. Once a item is processed the two methods will accept more entries and unblock. Also, this code does post everything to the procesing queue at once but it only takes out of the queue and processes the items `MaxDegreeOfParallelism` at a time. – Scott Chamberlain Sep 24 '17 at 19:32
6

Here is a generic method you can use.

when you call this TIn will be string (URL addresses) and the asyncProcessor will be your async method that takes the URL address as input and returns a Task.

The SlimSemaphore used by this method is going to allow only n number of concurrent async I/O requests in real time, as soon as one completes the other request will execute. Something like a sliding window pattern.

public static Task ForEachAsync<TIn>(
            IEnumerable<TIn> inputEnumerable,
            Func<TIn, Task> asyncProcessor,
            int? maxDegreeOfParallelism = null)
        {
            int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
            SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

            IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                try
                {
                    await asyncProcessor(input).ConfigureAwait(false);
                }
                finally
                {
                    throttler.Release();
                }
            });

            return Task.WhenAll(tasks);
        }
Dogu Arslan
  • 3,292
  • 24
  • 43
  • Thanks. It seems cute. I must test it and report back its working. – X X Sep 23 '17 at 19:37
  • sorry, I have a question. Does it create all tasks instantly and wait for the order of each task to became in timeline or it creates tasks whenever it's his time and necessary? – X X Sep 23 '17 at 20:07
  • `Task.WhenAll` internally creates a list for all the tasks so I think it will create them all immediately – Jakub Dąbek Sep 23 '17 at 20:13
  • What if mylist contains thousands or millions of urls? :( – X X Sep 23 '17 at 20:14
  • It will create the tasks immediately and will return you a task that completes when all created tasks complete. Whether you have millions or hundreds of elements in the list, method works the same way. If you specify maxDegreeOfParallism parameter, then there will only be that many concurrent async requests at any time, otherwise the method falls back to default value of max parallelism. As long as the async processor you provide is truely async this method does not need a threadpool thread for its async operations. You may want to experiment to find the right maxDegreeOfParallism value. – Dogu Arslan Sep 24 '17 at 05:43
  • I mean if I have thousands of urls then this methods create thousands of tasks in first and then await them for running. This can be improved because many tasks are just awaiting and awaiting a lot of time. If we can create the tasks whenever and as soon as they are required It's better. I think creating thousands of tasks at once could make system unresponsive or use lots of system resources unnecessarily. @DoguArslan – X X Sep 24 '17 at 06:16
  • The tasks are not thread based they would not use threadpool given that your async processor is also truely async and they will execute n at a time, with maxDegreeOfParallelism system resources are not going to be used to a degree of unresponsiveness. Like I said experiment with maxDegreeOfParallelism to find the suitable value for you. You could partition your input list and call this method with smaller size partitioned lists in a loop if you wanted to. That's the responsibility of caller not the method though. – Dogu Arslan Sep 24 '17 at 07:58
  • @DoguArslan Could you please help me? How can this be done in a manner so that tasks are created on the fly, on the order, not all of them at the start? – X X Oct 01 '17 at 11:45
2

A simple solution for throttling is a SemaphoreSlim.
EDIT
After a slight alteration the code now creates the tasks when they are needed

var client = new HttpClient();
SemaphoreSlim semaphore = new SemaphoreSlim(m, m); //set the max here
var tasks = new List<Task>();

foreach(var url in urls)
{
    // moving the wait here throttles the foreach loop
    await semaphore.WaitAsync();
    tasks.Add(((Func<Task>)(async () =>
    {
        //await semaphore.WaitAsync();
        var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here
        // do something with response
        semaphore.Release();
    }))());
}

await Task.WhenAll(tasks);

This is another way to do it

var client = new HttpClient();
var tasks = new HashSet<Task>();

foreach(var url in urls)
{
    if(tasks.Count == m)
    {
        tasks.Remove(await Task.WhenAny(tasks));            
    }

    tasks.Add(((Func<Task>)(async () =>
    {
        var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here
        // do something with response            
    }))());
}

await Task.WhenAll(tasks);
Jakub Dąbek
  • 1,044
  • 1
  • 8
  • 17
  • It seems to do the job, it downloads at mot m urls simulantously , but have a problem. For example if you have a list of one million urls, it creates one million tasks in a short time, and then awaiting for the order of each task to become. Am I wrong? – X X Sep 23 '17 at 19:55
  • You are right, it creates all the tasks in a short time. Also it does everything on the same thread, but you can change that using `ConfigureAwait(false)` or run them on a thread pool. I'll update the answer with some more info – Jakub Dąbek Sep 23 '17 at 19:58
  • My list of urls may be so long, may be millions and if all these millions task be created, It may cause out of memory, other exceptions or errors:) I'm seeking a solution to each part whenever it's required, with low memory footprints – X X Sep 23 '17 at 20:02
  • Can you run the code on a thread pool or do you want them on the same thread? – Jakub Dąbek Sep 23 '17 at 20:04
  • I prefer to not using Task.Run(). I like to use same thread as much as possible – X X Sep 23 '17 at 20:12
  • @XX I changed the code to create the tasks on the go, but it might be a little unpredictable on a single thread – Jakub Dąbek Sep 23 '17 at 20:47
1

Process items in parallel, limiting the number of simultaneous jobs:

string[] strings = GetStrings();  // Items to process.
const int m = 2;  // Max simultaneous jobs.

Parallel.ForEach(strings, new ParallelOptions {MaxDegreeOfParallelism = m}, s =>
{
    DoWork(s);
});
Pavel Tupitsyn
  • 8,393
  • 3
  • 22
  • 44
  • 2
    His DoWork is async, and Parallel.ForEach does not support async. – Scott Chamberlain Sep 23 '17 at 19:01
  • This method does not works in my problem. because you cannot use Parallel.ForEach with async methods. In the case of using Parallel.ForEach with async methods, all tasks are fired instantly (It doesn't wait to completion of async tasks). I'm using HttpClient.getAsync which is an async method. – X X Sep 23 '17 at 19:02