10

I have an application where i have 1000+ small parts of 1 large file.

I have to upload maximum of 16 parts at a time.

I used Thread parallel library of .Net.

I used Parallel.For to divide in multiple parts and assigned 1 method which should be executed for each part and set DegreeOfParallelism to 16.

I need to execute 1 method with checksum values which are generated by different part uploads, so i have to set certain mechanism where i have to wait for all parts upload say 1000 to complete. In TPL library i am facing 1 issue is it is randomly executing any of the 16 threads from 1000.

I want some mechanism using which i can run first 16 threads initially, if the 1st or 2nd or any of the 16 thread completes its task next 17th part should be started.

How can i achieve this ?

enter image description here

Wai Ha Lee
  • 8,598
  • 83
  • 57
  • 92
Kishan Gajjar
  • 1,120
  • 3
  • 22
  • 43
  • 6
    i like the image – Abdullah Oct 28 '15 at 14:57
  • If @usr's answer doesn't work, have a look at [my answer here](http://stackoverflow.com/a/15056827/106159) which might apply. Otherwise if you can use TPL's DataflowBlock classes that might be better (I'm thinking you can't because you specify C# 4) – Matthew Watson Oct 28 '15 at 14:59
  • @Abdullah Its just missing some hand drawn red circles – TheLethalCoder Oct 28 '15 at 15:23
  • Is it a client-side UI app or a server-side app? – noseratio Oct 29 '15 at 10:04
  • @Noseratio its a server side app. – Kishan Gajjar Oct 29 '15 at 11:20
  • @KishanGajjar, it's usually not a good idea to use TPL `Parallel` APIs on the server side, especially if the server can be under high load. It may badly affect the scalability. It's also an anti-pattern to use `Parallel` for what is naturally asynchronous operation, like file I/O. As others suggest, use Dataflow and async `Task`-based APIs like `WriteAsync`. – noseratio Oct 29 '15 at 20:26
  • This can also be done without Dataflow, with help of `SemaphoreSlim.WaitAsync`. Here is [some code](http://stackoverflow.com/a/21747253/1768303) for a similar case. – noseratio Oct 29 '15 at 20:29
  • Hi. Ive updated the code (just now). please reuse it ;) . there was a race condition problem which is fixed now (the race condition was in dequeue part). also I used `WhenAny` instead of `WaitAny` which now waits asynchronously. Both codes are same except second one DoWork is async. it is useful if you want to wait for something else inside it. @KishanGajjar – M.kazem Akhgary Oct 31 '15 at 14:00
  • 1
    please accept and use the approach by [usr](http://stackoverflow.com/a/33394444/4767498). i think you had partitioning problem. look at the [problem described here](http://stackoverflow.com/questions/33869830/ordered-parallel-is-not-working-as-expected-convert-list-into-ienumerable). – M.kazem Akhgary Nov 23 '15 at 12:21

4 Answers4

7

One possible candidate for this can be TPL Dataflow. This is a demonstration which takes in a stream of integers and prints them out to the console. You set the MaxDegreeOfParallelism to whichever many threads you wish to spin in parallel:

void Main()
{
    var actionBlock = new ActionBlock<int>(
            i => Console.WriteLine(i), 
            new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});

    foreach (var i in Enumerable.Range(0, 200))
    {
        actionBlock.Post(i);
    }
}

This can also scale well if you want to have multiple producer/consumers.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
4

Here is the manual way of doing this.

You need a queue. The queue is sequence of pending tasks. You have to dequeue and put them inside list of working task. When ever the task is done remove it from list of working task and take another from queue. Main thread controls this process. Here is the sample of how to do this.

For the test i used List of integer but it should work for other types because its using generics.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    ParallelQueue(items, DoWork);
}

private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (pending.Count != 0 && working.Count < 16)  // Maximum tasks
        {
            var item = pending.Dequeue(); // get item from queue
            working.Add(Task.Run(() => action((T)item))); // run task
        }
        else
        {
            Task.WaitAny(working.ToArray());
            working.RemoveAll(x => x.IsCompleted); // remove finished tasks
        }
    }
}

private static void DoWork(int i) // do your work here.
{
    // this is just an example
    Task.Delay(i).Wait(); 
    Console.WriteLine(i);
}

Please let me know if you encounter problem of how to implement DoWork for your self. because if you change method signature you may need to do some changes.

Update

You can also do this with async await without blocking the main thread.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    Task t = ParallelQueue(items, DoWork);

    // able to do other things.

    t.Wait();
}

private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (working.Count < 16 && pending.Count != 0)
        {
            var item = pending.Dequeue();
            working.Add(Task.Run(async () => await func((T)item)));
        }
        else
        {
            await Task.WhenAny(working);
            working.RemoveAll(x => x.IsCompleted);
        }
    }
}

private static async Task DoWork(int i)
{
    await Task.Delay(i);
}
M.kazem Akhgary
  • 18,645
  • 8
  • 57
  • 118
  • Why are you using `new Task` and not `Task.Run`? Also blocking with `Wait` in the server code is typically a bad idea. – avo Oct 30 '15 at 23:21
  • I just gave the way. `Task.Delay.Wait` was just an example. you can do anything you want inside `DoWork`. I appreciate you helped me. i fixed the code. it is better now (?).... im not experienced in multi-threading.also im new in programming (only 1 year) so bear with me ;) @avo – M.kazem Akhgary Oct 31 '15 at 04:48
  • I still think there's a non-blocking way of doing it (without `Wait`) but I'm retracting my negative vote. – avo Oct 31 '15 at 08:31
3
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
 .AsParallel()
 .AsOrdered()
 .WithDegreeOfParallelism(16)
 .WithMergeOptions(ParallelMergeOptions.NotBuffered)
 .ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });

This should be all you need. I forgot how the methods are named exactly... Look at the documentation.

Test this by printing to the console after sleeping for 1sec (which this sample code does).

usr
  • 168,620
  • 35
  • 240
  • 369
  • my answer was accepted. but i think it shouldn't be. I think OP had the which is described here http://stackoverflow.com/questions/33869830/ordered-parallel-is-not-working-as-expected-convert-list-into-ienumerable/33869969#33869969 .`workitems.Select(x => x)` is required to fix this parallel for List of items. Your approach will split list into chunks and that wasnt what OP wanted. so changing list into ienumerable will fix his problem. – M.kazem Akhgary Nov 23 '15 at 11:36
  • @M.kazemAkhgary that's actually a good point, I forgot about partitioning. This really is a bad TPL default, another one. – usr Nov 23 '15 at 12:08
1

Another option would be to use a BlockingCollection<T> as a queue between your file reader thread and your 16 uploader threads. Each uploader thread would just loop around consuming the blocking collection until it is complete.

And, if you want to limit memory consumption in the queue you can set an upper limit on the blocking collection such that the file-reader thread will pause when the buffer has reached capacity. This is particularly useful in a server environment where you may need to limit memory used per user/API call.

// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);

// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...

File reader thread

...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();

Sending threads

for(int i = 0; i < 16; i++)
{
   Task.Run(() => {
      foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
      {
          .. do the upload
      }
   });
}
Ian Mercer
  • 38,490
  • 8
  • 97
  • 133