1

I'm writing a producer-> Queue -> Consumer -> Queue2 -> Consumer2 application

I have the consumer2 wait for a list to get to a threshold then start another task simulating a long running task (e.g. SQL Multi=insert).

However, when I run the application the 'long running task' (LongRunningTaskInsert()) seems to wait until all of the queues have signaled completion before writing to the console.

When I debug, the List Lists variable shows that some of the tasks are completing in the middle of the application. Am I doing something wrong/naughty with tasks?

Code:

class Program
{
    static  void Main(string[] args)
    {
        BlockingCollection<string> bag1 = new BlockingCollection<string>();
        BlockingCollection<string> bag2 = new BlockingCollection<string>();
        var Tasks = new List<Task>();
        List<string> Container = new List<string>();

        Task consumer2 = Task.Factory.StartNew(() =>
        {
            foreach (var item in bag2.GetConsumingEnumerable())
            {
                Container.Add(item);
                if (bag2.IsCompleted || Container.Count > 5)
                {
                    Console.WriteLine("Container:");
                    Container.ForEach(y =>
                    {
                        Console.Write($"Value: {y}, ");
                    });
                    Console.Write("\n");

                    var newTask = Task.Factory.StartNew(() => {
                        Thread.Sleep(2000);
                        LongRunningTaskInsert();

                    }
                    );
                    Tasks.Add(newTask);

                    Container.Clear();
                }
            }

            Task.WhenAll(Tasks);
        });

        //this is a task that evaluates all available elements on separate threads.
        Task consumer1 = Task.Factory.StartNew(() =>
        {
            //do something with the consumer
            Parallel.ForEach(
                bag1.GetConsumingEnumerable(),
                (x) =>
                {
                    Console.WriteLine($"Consumer {x} => bag2, thread {Thread.CurrentThread.ManagedThreadId}");
                    bag2.Add(x);
                });
            bag2.CompleteAdding();

        });

        Task producer = Task.Factory.StartNew(() =>
        {
            //do something to put records into the bad
            for (int i = 0; i < 10; i++)
            {


                System.Threading.Thread.Sleep(500);

                bag1.Add(i.ToString());
                bag1.Add((i * 10).ToString());
                bag1.Add((i + 10).ToString());
                Console.WriteLine($"Producer: {i} & { i * 10} & {i + 10}");
            }

            bag1.CompleteAdding();
        });

        producer.Wait();
        consumer1.Wait();
        consumer2.Wait();
        Console.Read();
    }

    private static bool LongRunningTaskInsert()
    {
        //Thread.Sleep(1000);
        Console.WriteLine("Long Running Task Complete");
        return true;
    }
}

edit: The output I'm getting is:

Producer: 0 & 0 & 10
Consumer 0 => bag2, thread 4
Consumer 0 => bag2, thread 6
Consumer 10 => bag2, thread 5
Producer: 1 & 10 & 11
Consumer 10 => bag2, thread 8
Consumer 11 => bag2, thread 10
Consumer 1 => bag2, thread 9
Container:
Value: 0, Value: 0, Value: 10, Value: 10, Value: 11, Value: 1,
Producer: 2 & 20 & 12
Consumer 20 => bag2, thread 4
Consumer 2 => bag2, thread 6
Consumer 12 => bag2, thread 5
Producer: 3 & 30 & 13
Consumer 3 => bag2, thread 10
Consumer 30 => bag2, thread 9
Consumer 13 => bag2, thread 8
Container:
Value: 20, Value: 2, Value: 12, Value: 3, Value: 30, Value: 13,
Producer: 4 & 40 & 14
Consumer 4 => bag2, thread 4
Consumer 40 => bag2, thread 6
Consumer 14 => bag2, thread 5
Producer: 5 & 50 & 15
Consumer 5 => bag2, thread 10
Consumer 15 => bag2, thread 8
Consumer 50 => bag2, thread 9
Container:
Value: 4, Value: 40, Value: 14, Value: 5, Value: 15, Value: 50,
Producer: 6 & 60 & 16
Consumer 6 => bag2, thread 6
Consumer 60 => bag2, thread 6
Producer: 7 & 70 & 17
Consumer 16 => bag2, thread 4
Consumer 70 => bag2, thread 5
Consumer 17 => bag2, thread 5
Consumer 7 => bag2, thread 4
Container:
Value: 6, Value: 60, Value: 16, Value: 70, Value: 17, Value: 7,
Producer: 8 & 80 & 18
Consumer 8 => bag2, thread 6
Consumer 80 => bag2, thread 6
Producer: 9 & 90 & 19
Consumer 90 => bag2, thread 4
Consumer 19 => bag2, thread 4
Consumer 18 => bag2, thread 8
Consumer 9 => bag2, thread 8
Container:
Value: 8, Value: 80, Value: 90, Value: 19, Value: 18, Value: 9,
Long Running Task Complete
Long Running Task Complete
Long Running Task Complete
Long Running Task Complete
Long Running Task Complete

I expect the 'Long Running Task Complete' to be mixed in and not all clustered at the end.

ErichO
  • 81
  • 7
  • Does the behaviour change if you set `SetMinThreads` (https://learn.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setminthreads?view=netframework-4.7.2) to 50 beforehand? – mjwills Jan 18 '19 at 20:03
  • @mjwills no change when I set min worker threads to 50 and leave completion port threads set to 8 – ErichO Jan 18 '19 at 20:18
  • If I move the producer task to an external method and mark it async and change Thread.Sleep -> await Task.Delay() I start to get different results, and I get mixed in console calls from the long running task.. – ErichO Jan 18 '19 at 20:25
  • 1
    It looks like the Parallel.Foreach statement is spawning a bunch of threads and my method LongRunningTaskInsert() isn't getting any clock time.. If I change that to a synchronous foreach loop my number of threads is reduced from 8 to 4 and I get the results I expect (where the long running task console calls are mixed in). – ErichO Jan 18 '19 at 20:31

1 Answers1

2

The Parallel.Foreach statement is spawning a bunch of threads and my method LongRunningTaskInsert() isn't getting any clock time.. If I change that to a synchronous foreach loop my number of threads is reduced from 8 to 4 and I get the results I expect (where the long running task console calls are mixed in).

ErichO
  • 81
  • 7
  • That's why you should configure the `Parallel.ForEach` with a reasonable `MaxDegreeOfParallelism`, and not rely on the default `-1`. I've posted an answer that explains in details the effect of this setting [here](https://stackoverflow.com/questions/9538452/what-does-maxdegreeofparallelism-do/75287075#75287075 "What does MaxDegreeOfParallelism do?"). – Theodor Zoulias Feb 04 '23 at 19:10