10

I want to compare two theoretical scenarios. I have simplified the cases for purpose of the question. But basically its your typical producer consumer scenario. (I'm focusing on the consumer).

I have a large Queue<string> dataQueue that I have to transmit to multiple clients.

So lets start with the simpler case:

 class SequentialBlockingCase
 {
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                foreach (var destination in _destinations)
                {
                    SendDataToDestination(destination, data);
                }
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post, instead simulate the send
        Thread.Sleep(200);
    }
}
}

Now this setup works just fine. It sits there and polls the Queue and when there is data to send it sends it to all the destinations.

Issues:

  • If one of the destinations is unavailable or slow, it effects all of the other destinations.
  • It does not make use of multi threading in the case of parallel execution.
  • Blocks for every transmission to each destination.

So here is my second attempt:

 class ParalleBlockingCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                Parallel.ForEach(_destinations, destination =>
                {
                    SendDataToDestination(destination, data);
                });
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        Thread.Sleep(200);
    }
}

This revision at least does not effect the other destinations if 1 destination is slow or unavailable.

However this method is still blocking and I am not sure if Parallel.ForEach makes use of the thread pool. My understanding is that it will create X number of threads / tasks and execute 4 (4 core cpu) at a time. But it has to completely Finnish task 1 before task 5 can start.

Hence My 3rd option:

class ParalleAsyncCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string> { };

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                List<Task> tasks = new List<Task>();
                foreach (var destination in _destinations)
                {
                    var task = SendDataToDestination(destination, data);
                    task.Start();
                    tasks.Add(task);
                }

                //Wait for all tasks to complete
                Task.WaitAll(tasks.ToArray());
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static async Task SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        await Task.Delay(200);
    }
}

Now from my understanding this option, will still block on the main thread at Task.WaitAll(tasks.ToArray()); which is fine because I don't want it to run away with creating tasks faster than they can be executed.

But the tasks that will be execute in parallel should make use of the ThreadPool, and all X number of tasks should start executing at once, not blocking or in sequential order. (thread pool will swap between them as they become active or are awaiting)

Now my question.

Does option 3 have any performance benefit over option 2.

Specifically in a higher performance server side scenario. In the specific software I am working on now. There would be multiple instanced of my simple use case above. Ie several consumers.

I'm interested in the theoretical differences and pro's vs cons of the two solutions, and maybe even a better 4th option if there is one.

Zapnologica
  • 22,170
  • 44
  • 158
  • 253
  • Please note that `Task` doesn't mean `Thread`. Your code won't become magically multithreaded just because you write `async` in your method declaration. In fact, everything before the first `await` will execute in the same thread that called the async method (which may or may not be what you want). If you specifically want `SendDataToDestination` to execute in a different thread, use `Task.Run` – Kevin Gosse Aug 22 '16 at 09:34
  • 2
    Also, unless you manually create the task using `new Task(...)`, you never ever have to call `task.Start()`. The task is already started – Kevin Gosse Aug 22 '16 at 09:35
  • Last but not least, the `while(true) { ... Thread.Sleep(1); }` is a bad idea. If you know data will arrive at a steady pace (e.g. many times per millisecond), use a `SpinLock` and a `ConcurrentQueue`. If data arrives at a slower pace, use synchronization, for instance with a `BlockingCollection` – Kevin Gosse Aug 22 '16 at 09:37
  • @KooKiz those where for simplicity of the scenario. Agreed do not do in production. In this case i'm more concerned about the async and concurrency of my `SendDataToDestination` – Zapnologica Aug 22 '16 at 09:39
  • Nothing beats benchmarking, but since you're I/O bound rather than CPU bound, *I think* creating new tasks and using asynchrounous APIs *could* perform better than using `Parallel.ForEach`. Per default, `Parallel.ForEach` is constrained by the number of CPUs (though you can manually change the degree of parallelism). – Kevin Gosse Aug 22 '16 at 09:48
  • @KooKiz agreed with the bench marking. I just feel that I need some more theoretical knowledge to know what exactly I am bench-marking. And this case is very specific to `I/O bound` operations. I just worry that the overhead of creating tasks could outweigh the benefit. But then again I am more concerned about a holistic throughput at scale. With many IO operations taking place. – Zapnologica Aug 22 '16 at 10:10
  • Possible duplicate of [Nesting await in Parallel.ForEach](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach) – Michael Freidgeim Dec 12 '17 at 21:27

2 Answers2

14

Parallel.ForEach will use the thread pool. Asynchronous code will not, since it doesn't need any threads at all (link is to my blog).

As Mrinal pointed out, if you have CPU-bound code, parallelism is appropriate; if you have I/O-bound code, asynchrony is appropriate. In this case, an HTTP POST is clearly I/O, so the ideal consuming code would be asynchronous.

maybe even a better 4th option if there is one.

I would recommend making your consumer fully asynchronous. In order to do so, you'll need to use an async-compatible producer/consumer queue. There's a fairly advanced one (BufferBlock<T>) in the TPL Dataflow library, and a fairly simple one (AsyncProducerConsumerQueue<T>) in my AsyncEx library.

With either of them, you can create a fully asynchronous consumer:

List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
  var task = SendDataToDestination(destination, data);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

or, more simplified:

var tasks = _destinations
    .Select(destination => SendDataToDestination(destination, data));
await Task.WhenAll(tasks);
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thanks for your response, Quick clarification, The code examples you have there. Don't make use of the producer/consumer queue you mentioned. It is the same as option 3 just `awaiting` the `Task.WhenAll()`. – Zapnologica Aug 23 '16 at 08:06
  • @Zapnologica: The code examples just show how to do the processing after the item has been dequeued. – Stephen Cleary Aug 23 '16 at 12:30
4

Your main question - Parallel.ForEach vs Async Forloop

  • For computing operations, in memory processing always Parallel API as Thread invoked from thread pool is utilized to do some work, which is its purpose of Invocation.
  • For IO bound operations, always Async-Await, as there's no thread invoked and it use the Hardware capability IO completion ports to process in the background.

Since Async-Await is the preferred option, let me point out few things in your implementation:

  • It is Synchronous since you are not awaiting the main operation Send data using http post, the correct code would be await Http Post Async not await Task.Delay
  • If you are calling the standard Async implementation like Http post Async, you needn't explicitly start the Task, that is only the case if you have custom Async method
  • Task.WaitAll will only work for Console application, which doesn't have the Synchronization context or UI thread, otherwise it will lead to dead lock, you need to use Task.WhenAll

Now regarding Parallel approach

  • Though the code is correct and Parallel API indeed work on Thread pool, and mostly it is able to reuse the threads, thus optimizing, but if the tasks are long running, it may end up creating multiple threads, to restrict that you may use the constructor option new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, thus restricting the max number to the number of logical cores in the system

Another important point why Parallel API is a bad idea for the IO bound calls, since each thread is a costly resource for UI, includes creation of Thread environment block + User memory + Kernel Memory and in IO operation it sits idle doing nothing, which is not good by any measure

Mrinal Kamboj
  • 11,300
  • 5
  • 40
  • 74
  • 1
    In my tests over the years, Max DOP = C * Cores, C > 1.5 seems to 'work well' in general due to overall time-sharing with the entire system on a heavy-CPU load. 1:1 pinning is meh. For a heavy IO/wait load C should be increased as the relative CPU-load decreases (there can be a fair number of threads before context switches or resources become a dominating restriction for IO/wait tasks). – user2864740 Aug 23 '16 at 02:32
  • @user2864740 Code above is primarily to suggest, how to apply Degree of Parallelism, though normally even if we put a number like (1.5*Environment.ProcessorCount), that's the max and mostly thread pool will never have that many number of threads, until and unless tasks are genuinely long running and blocking in nature. Also the exercise suggested by you, would work preferably for individual thread management, but pool in my view works in a much more controlled manner – Mrinal Kamboj Aug 23 '16 at 19:29
  • my 5 coins there, related to Synchronization context https://blog.stephencleary.com/2017/03/aspnetcore-synchronization-context.html – lyolikaa Sep 03 '21 at 12:38