1

My program is running tasks in groups of n tasks at once each time. Each task writes data to a Queue<string> object of his own, provided by an index to a Queue<string> in a List<Queue<string>> of queues. the tasks do not share data or Queues, yet I still get synchronization errors. I know the data structures are not Thread-Safe, I don't understand why they should be, and why I get the errors, since each Task has his own data structure, what could cause the errors?

here is a simple code to demonstrate:

class Program
{
    static int j = 0;
    List<Queue<string>> queueList = new List<Queue<string>>();

    public void StartTasts(int n)
    {
        for (int i = 0; i < n; i++)
            queueList.Add(new Queue<string>());

        List<Task> tsk = new List<Task>();
        for (int TaskGroup = 0; TaskGroup < 10; TaskGroup++)
        {   //10 groups of task
            //each group has 'n' tasks working in parallel
            for (int i = 0; i < n; i++)
            {
                //each task gets its own and independent queue from the list
                tsk.Add(Task.Factory.StartNew(() =>
                {
                    DoWork(j % n);
                }));
                j++;
            }
            //waiting for each task group to finish
            foreach (Task t in tsk)
                t.Wait();
            //after they all finished working with the queues, clear queues
            //making them ready for the nest task group
            foreach (Queue<string> q in queueList)
                q.Clear();
        }
    }

    public void DoWork(int queue)
    {
        //demonstration of generating strings 
        //and put them in the correct queue
        for (int k = 0; k < 10000; k++)
            queueList[queue].Enqueue(k + "");
    }


    static void Main(string[] args)
    {
        new Program().StartTasts(10);
    }

}

this program generate some errors such as:

System.ArgumentException: 'Destination array was not long enough. Check destIndex and length, and the array's lower bounds.'

System.IndexOutOfRangeException: 'Index was outside the bounds of the array.' (at the Queue)

System.AggregateException: One or more errors occurred. ---> System.ArgumentException: Source array was not long enough. Check srcIndex and length, and the array's lower bounds.

and more errors and would not come up on a Serial case. I would love to understand why because I cant see how these tasks mess up each other's independent Queues.

Community
  • 1
  • 1
Elad Cohen
  • 453
  • 3
  • 16
  • On what lines are the errors? – nicomp Nov 25 '18 at 00:37
  • @nicomp its not consistent, some occur at `q.Clear();`, some occur at `queueList[queue].Enqueue(i + "");` and more. – Elad Cohen Nov 25 '18 at 00:39
  • You're incrementing loop variable `i` as the for-loop increment and also as the last statement of the for loop. That, plus the `i%n` in the call to `DoTask` are why multiple tasks are getting the same index. Or maybe one such reason. Anyway, check that out and then report back. With more information, as nicomp suggests. – davidbak Nov 25 '18 at 00:39
  • Also it looks like you're starting 100 tasks - nested for loops! But only 10 queues. Look at that too. – davidbak Nov 25 '18 at 00:42
  • @davidbak I changed the static variable name from `i` to `j` to prevent incrementing it not on purpose, yet same error still occur. – Elad Cohen Nov 25 '18 at 00:44
  • @davidbak yeah, thats the part I want to understand, 100 tasks, 10 at each time (not all 100 at the same time, they are grouped to 10 tasks at each time, 10 tasks 10 times). than 10 queues should be enough for each group, if I make sure they finished using it and clear it for the next group to come, thats what I understand. – Elad Cohen Nov 25 '18 at 00:46
  • this code is pretty messed up -_- – Chris Nov 25 '18 at 00:48
  • @Chris maybe, Im trying to understand a point about multithreading, something that looks pretty obvious to me but apparently its not, care to explain the problem you see? – Elad Cohen Nov 25 '18 at 00:52
  • @EladCohen no, i mean, not the threading. Just the code is so hard to read. All your loops and lists. I'll eat my hat, if thats the simplest form :) – Chris Nov 25 '18 at 01:10
  • @Chris theres always a place for improvement :) I was trying to make it as clear as possible since my project is much more complex than this, I hope it did make sense though – Elad Cohen Nov 25 '18 at 08:03
  • Actually not quite yet. It probably would, if there would be some meaning behind the code, so one could understand what you tried to achieve. But don't bother. I can live without knowing ;) – Chris Nov 25 '18 at 08:34

3 Answers3

4

The problem is normal variable closure issues. Because all tasks share the same instance of the variable j they will all share the same value, most likely what is happening is your loop starts up 10 tasks super quick, but before any of them can get to j % n the value of j has already become 10.

Make a local copy of k that is declared within the scope of the for loop and it should solve your problem.

public void StartTasts(int n)
{
    for (int i = 0; i < n; i++)
        queueList.Add(new Queue<string>());

    List<Task> tsk = new List<Task>();
    for (int TaskGroup = 0; TaskGroup < 10; TaskGroup++)
    {   //10 groups of task
        //each group has 'n' tasks working in parallel
        for (int i = 0; i < n; i++)
        {
            int k = j; // `int k = i;` would work here too and give you the same results.

            tsk.Add(Task.Factory.StartNew(() =>
            {
                DoWork(k % n);
            }));
            j++;
        }
        //waiting for each task group to finish
        foreach (Task t in tsk)
            t.Wait();
        //after they all finished working with the queues, clear queues
        //making them ready for the nest task group
        foreach (Queue<string> q in queueList)
            q.Clear();
    }
}

If you want to see the problem in action with a simpler recreation, try this simple code instead.

public static void Main(string[] args)
{

    for (int i = 0; i < 10; i++)
    {
        int j = i;
        Task.TaskFactory.StartNew(() =>
        {
            Thread.Sleep(10); //Give a little time for the for loop to complete.
            Console.WriteLine("i: " + i + " j: " + j);
        }
    });
    Console.ReadLine();
}
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431
  • Note that at least `foreach` [is fixed](https://stackoverflow.com/questions/7123898/lambda-capture-problem-with-iterators) and can be used to avoid this problem - `foreach(var i in Enumerable.Range(0,10)) {...}` would work too in C# 5+ (also somewhat strange way to write `for`) – Alexei Levenkov Nov 25 '18 at 02:16
1

You have calculated the taskId inside of the task and changed the base for the calculation outside of the task. I have changed the logic only slightly. I have not had any errors.

namespace Project1
{
    using System.Collections.Generic;
    using System.Threading.Tasks;

    internal class Program
    {
        private static int j = 0;
        private readonly List<Queue<string>> queueList = new List<Queue<string>>();

        public void StartTasts(int n)
        {
            for (var i = 0; i < n; i++)
            {
                this.queueList.Add(new Queue<string>());
            }

            var taskList = new List<Task>();
            for (var taskGroup = 0; taskGroup < 10; taskGroup++)
            {
                // 10 groups of task
                // each group has 'n' tasks working in parallel
                for (var i = 0; i < n; i++)
                {
                    // each task gets its own and independent queue from the list
                    var taskId = j % n;
                    taskList.Add(
                        Task.Factory.StartNew(
                            () =>
                            {
                                this.DoWork(taskId);
                            }));
                    j++;
                }

                // waiting for each task group to finish
                foreach (var t in taskList)
                {
                    t.Wait();
                }

                // after they all finished working with the queues, clear queues
                // making them ready for the nest task group
                foreach (var q in this.queueList)
                {
                    q.Clear();
                }
            }
        }

        public void DoWork(int queue)
        {
            // demonstration of generating strings 
            // and put them in the correct queue
            for (var k = 0; k < 10000; k++)
            {
                this.queueList[queue].Enqueue(k + string.Empty);
            }
        }

        private static void Main(string[] args)
        {
            new Program().StartTasts(10);
        }      
    }
}
Chris
  • 527
  • 3
  • 15
0

I do not think you problem is in the que, it seems like it in the list itself might be an issue.

As the rule of using parallel or synchronous processes the list is not a thread save DS.

Try using the thread save DS Like ConcurrentBag Class

COLD TOLD
  • 13,513
  • 3
  • 35
  • 52