0

I need to have 5 tasks completed in parallel with max 2 executed at a time. So, as soon as some task is finished, the next should be run - up until there are no tasks pending.

I'm using a solution by L.B. which involves using semaphores for synchronization across tasks.

void LaunchTaskPool ()
    {
        SemaphoreSlim maxThreadSemaphore = new SemaphoreSlim(2); //Max 2 tasks at a time.

        for (int i = 0; i < 5; i++)                     //loop through 5 tasks to be assigned
        {
            maxThreadSemaphore.Wait();                  //Wait for the queue

            Console.WriteLine("Assigning work {0} ", i);

            Task t = Task.Factory.StartNew(() =>
            {
                DoWork(i.ToString());                   // assign tasks
            }, TaskCreationOptions.LongRunning
                )
                .ContinueWith(
                (task) => maxThreadSemaphore.Release()  // step out of the queue
                );
        }

    }

    void DoWork(string workname)
    {
        Thread.Sleep(100);
        Console.WriteLine("--work {0} starts", workname);
        Thread.Sleep(1000);
        Console.WriteLine("--work  {0} finishes", workname);

    }

The problem is that some random tasks would not even start. For example here Work 1 and 3 never started and Work 4 got run twice:

Output

I tried adding Task.WaitAll() as suggested here, but it didn't help.

thanks in advance for your suggestions!

Constantine.

2 Answers2

5

I recommend using Parallel.For() for this instead; there's no need to reinvent the wheel! You can specify MaxDegreeOfParallelism when using Parallel.For():

For example:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp4
{
    class Program
    {
        static void Main()
        {
            Parallel.For(
                0, // Inclusive start
                5, // Exclusive end
                new ParallelOptions{MaxDegreeOfParallelism = 2},
                i => DoWork(i.ToString()));
        }

        static void DoWork(string workname)
        {
            Thread.Sleep(100);
            Console.WriteLine("--work {0} starts", workname);
            Thread.Sleep(1000);
            Console.WriteLine("--work  {0} finishes", workname);

        }
    }
}

(Actually I just looked, and this is already in one of the other answers in the thread you linked - is there a reason you didn't want to use that solution? If not, I guess we should close this question as a duplicate...)

Anyway to answer your actual question:

You are accessing a "modified closure" in the loop. To fix this, make a copy of the loop variable i before passing it to the task:

SemaphoreSlim maxThreadSemaphore = new SemaphoreSlim(2); //Max 2 tasks at a time.

for (int i = 0; i < 5; i++)                     //loop through 5 tasks to be assigned
{
    maxThreadSemaphore.Wait();                  //Wait for the queue

    Console.WriteLine("Assigning work {0} ", i);
    int copy = i; // <----- Make a copy here.

    Task t = Task.Factory.StartNew(() =>
            {
                DoWork(copy.ToString());                   // assign tasks
            }, TaskCreationOptions.LongRunning
        )
        .ContinueWith(
            (task) => maxThreadSemaphore.Release()  // step out of the queue
        );
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
1

The problem with your solution is that before the Task is started the loop has allready run through and is starting the next Task.

As @Matthew Watson recommended you should use Parallel.For.


Just out of interest this would solve your problem:

static void LaunchTaskPool()
{
    SemaphoreSlim maxThreadSemaphore = new SemaphoreSlim(2); //Max 2 tasks at a time.

    for (int i = 0; i < 5; i++)                     //loop through 5 tasks to be assigned
    {
        maxThreadSemaphore.Wait();                  //Wait for the queue

        Console.WriteLine("Assigning work {0} ", i);

        StartThead(i, maxThreadSemaphore);
    }
}

static void StartThead(int i, SemaphoreSlim maxThreadSemaphore)
{
    Task.Factory.StartNew(
        () => DoWork(i.ToString()),
        TaskCreationOptions.None
    ).ContinueWith((task) => maxThreadSemaphore.Release());
}

static void DoWork(string workname)
{
    Thread.Sleep(100);
    Console.WriteLine("--work {0} starts", workname);
    Thread.Sleep(1000);
    Console.WriteLine("--work  {0} finishes", workname);
}
NtFreX
  • 10,379
  • 2
  • 43
  • 63