2

When starting new tasks in a for loop using TaskFactory, I pass the current loop index as a parameter to the lambda function that initiates the Task. This index is used to select an item from a List and call the worker function for the item. It appears that you can't rely upon the value of the index being passed reliably to the worker Task.

If you execute the code below the console output reveals that some of the workers didn't start, and a matching number of others were started at least twice. It might look like this:

Results:
        Started: 7, Completed: 7, Already Started: 2

This can be 'cured' by taking a copy of the loop index value and passing this into the worker function (it's the bit commented out in the Manager RunWorkers function), and this gives the expected result of:

Results:
        Started: 10, Completed: 10, Already Started: 0

I wish to understand this behaviour so that I can properly guard against it (I'm assuming I've done something stupid and the 'fix' only hides the problem and can't be relied upon)

BTW - Removing the guard in the Manager RunOne function can result in an ArgumentOutOfRange exception, as index is too large.

I've included code for a C# consol app below (Visual Studio 2013), first the Worker

namespace ThreadingTest
{
    public class Worker
    {
        public bool hasStarted = false;
        public bool hasCompleted = false;
        public bool hasAlreadyStarted = false;

        public readonly int index;

        private double value;

        public Worker(int _index)
        {
            index = _index;
        }

        public void workSocksOff()
        {
            if (hasStarted)
            {
                hasAlreadyStarted = true;
                return;
            }
            hasStarted = true;
            // Do real work
            for (int i=0; i<10000000; ++i)
            {
                value = Math.Sqrt(i);
            }
            hasCompleted = true;
        }

    }
}

Then the Manager

namespace ThreadingTest
{
    public class Manager
    {
        public List<Worker> Workers = new List<Worker>();
        private Object taskLock = new Object();

        public int TaskCount { get; set; }

        public void RunTest()
        {
            AddWorkers();
            RunWorkers();        
        }

        private void RunWorkers()
        {
            TaskCount = 0;
            TaskFactory taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            Task[] taskPool = new Task[Workers.Count];
            for (int i=0; i<Workers.Count; ++i)
            {
                //int why = i;
                //taskPool[i] = taskFactory.StartNew(() => this.RunOne(why))

                taskPool[i] = taskFactory.StartNew(() => this.RunOne(i))
                    .ContinueWith( (antecedant) =>
                    {
                        lock (taskLock) { TaskCount += 1; }
                    }
                    );
            }
            Task.WaitAll(taskPool);
        }

        private void RunOne(int index)
        {
            if (index >= Workers.Count)
                return;
            Workers[index].workSocksOff();
        }

        private void AddWorkers()
        {
            for (var i = 0; i < 10; ++i)
                Workers.Add(new Worker(i));
        }
    }
}

Finally the Program itself

namespace ThreadingTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Manager manager = new Manager();

            manager.RunTest();

            int started = 0, completed = 0, alreadyStarted = 0;

            foreach (Worker w in manager.Workers)
            {
                if (w.hasStarted) started++;
                if (w.hasCompleted) completed++;
                if (w.hasAlreadyStarted) alreadyStarted++;
            }

            Console.WriteLine("Results: ");
            Console.WriteLine("\tStarted: {0}, Completed: {1}, Already Started: {2}", started, completed, alreadyStarted);
            Console.ReadKey();
        }

    }
}
Gray-M
  • 302
  • 2
  • 12
  • 2
    By the time the task actually gets executed, the value of i is already changed. When you copy it, you are passing in a explicit value for that scope and cannot be affected by anything outside of said scope. You make also want to look at the `Interlocked.Increment` method instead of doing the lock yourself. – Jacob Roberts May 12 '15 at 18:20
  • @Jacob: So StartNew doesn't create the task immediately, this is deferred until the TaskFactory decides to do so on it's own thread? And it is at this point that the 'value' of the index is passed to the Task.? Thanks for the Interlocked steer btw – Gray-M May 12 '15 at 18:49
  • 1
    @Eugene: Thanks - that does look like the fundamental problem. So taking a copy is the solution. – Gray-M May 12 '15 at 18:53
  • 1
    You can use a `Parallel.For(0, Workers.Count, (i) => { /* parallel code here */ });` This will keep your counter in order. `i`is the counter in this demo. – Jacob Roberts May 12 '15 at 19:56
  • @Jacob: Again thanks for the steer - that looks very useful – Gray-M May 12 '15 at 20:51

0 Answers0