When starting new tasks in a for loop using TaskFactory, I pass the current loop index as a parameter to the lambda function that initiates the Task. This index is used to select an item from a List and call the worker function for the item. It appears that you can't rely upon the value of the index being passed reliably to the worker Task.
If you execute the code below the console output reveals that some of the workers didn't start, and a matching number of others were started at least twice. It might look like this:
Results:
Started: 7, Completed: 7, Already Started: 2
This can be 'cured' by taking a copy of the loop index value and passing this into the worker function (it's the bit commented out in the Manager RunWorkers function), and this gives the expected result of:
Results:
Started: 10, Completed: 10, Already Started: 0
I wish to understand this behaviour so that I can properly guard against it (I'm assuming I've done something stupid and the 'fix' only hides the problem and can't be relied upon)
BTW - Removing the guard in the Manager RunOne function can result in an ArgumentOutOfRange exception, as index is too large.
I've included code for a C# consol app below (Visual Studio 2013), first the Worker
namespace ThreadingTest
{
public class Worker
{
public bool hasStarted = false;
public bool hasCompleted = false;
public bool hasAlreadyStarted = false;
public readonly int index;
private double value;
public Worker(int _index)
{
index = _index;
}
public void workSocksOff()
{
if (hasStarted)
{
hasAlreadyStarted = true;
return;
}
hasStarted = true;
// Do real work
for (int i=0; i<10000000; ++i)
{
value = Math.Sqrt(i);
}
hasCompleted = true;
}
}
}
Then the Manager
namespace ThreadingTest
{
public class Manager
{
public List<Worker> Workers = new List<Worker>();
private Object taskLock = new Object();
public int TaskCount { get; set; }
public void RunTest()
{
AddWorkers();
RunWorkers();
}
private void RunWorkers()
{
TaskCount = 0;
TaskFactory taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
Task[] taskPool = new Task[Workers.Count];
for (int i=0; i<Workers.Count; ++i)
{
//int why = i;
//taskPool[i] = taskFactory.StartNew(() => this.RunOne(why))
taskPool[i] = taskFactory.StartNew(() => this.RunOne(i))
.ContinueWith( (antecedant) =>
{
lock (taskLock) { TaskCount += 1; }
}
);
}
Task.WaitAll(taskPool);
}
private void RunOne(int index)
{
if (index >= Workers.Count)
return;
Workers[index].workSocksOff();
}
private void AddWorkers()
{
for (var i = 0; i < 10; ++i)
Workers.Add(new Worker(i));
}
}
}
Finally the Program itself
namespace ThreadingTest
{
class Program
{
static void Main(string[] args)
{
Manager manager = new Manager();
manager.RunTest();
int started = 0, completed = 0, alreadyStarted = 0;
foreach (Worker w in manager.Workers)
{
if (w.hasStarted) started++;
if (w.hasCompleted) completed++;
if (w.hasAlreadyStarted) alreadyStarted++;
}
Console.WriteLine("Results: ");
Console.WriteLine("\tStarted: {0}, Completed: {1}, Already Started: {2}", started, completed, alreadyStarted);
Console.ReadKey();
}
}
}