1

I have orders coming in from multiple threads and I want to process this data in one thread. If I understood it right, the way to do it is with ConcurrentQueue.

I had a look at SO question How to work threading with ConcurrentQueue<T>, but it did not answer my questions.

I wrote a small test application (with .NET Core 2.1) to see if I could get it to work.

This is what it should do: Make aggregates for 100 orders. There are 3 aggregates for 3 different order types: Type1, Type2 and Type3

The output should be something like:

Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100

I started of writing the application without ConcurrentQueue. As exepected, the results in _aggregates are wrong.

/* Incorrect version, not using ConcurrentQueue, showing incorrect results */

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;

namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;

        private static IEnumerable<Order> _orders;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();

            //Execute
            MainAsync(args).GetAwaiter().GetResult();
        }

        static async Task MainAsync(string[] args)
        {
            await Task.Run(() => ProcessOrders());

            ShowOutput();
        }

        public static void ProcessOrders()
        {
            var aggregator = new Aggregator();
            Parallel.ForEach(_orders, order => {
                aggregator.Aggregate(order, _aggregates);
            });
        }

        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();

            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }

            return orderList;
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
            return order;
        }

        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class Aggregator
    {
        public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
        {
            aggregates[order.OrderType].Count++;
        }
    }

    public class Aggregate
    {
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

So I rewrote the application using ConcurrentQueue. The results are correct now, but I have got the feeling I am doing it wrong or it can be done more efficiently.

/* improved version using ConcurrentQueue, showing correct results */

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;

namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;

        private static IEnumerable<Order> _orders;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();

            //Execute
            var proxy = new OrderProxy();
            var ordersQueue = new ConcurrentQueue<OrderResult>();
            Parallel.ForEach(_orders, order => {
                var orderResult = proxy.PlaceOrder(order);
                ordersQueue.Enqueue(orderResult);
            });

            foreach (var order in ordersQueue)
            {
                _aggregates[order.OrderType].Count++;
            }

            ShowOutput();
        }

        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();

            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }

            return orderList;
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
            return order;
        }

        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum AggregateType { get; set; }
    }

    public class OrderResult
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class OrderProxy
    {
        public OrderResult PlaceOrder(Order order)
        {
            var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
            return orderResult;
        }
    }

    public class Aggregate
    {
        public OrderTypeEnum OrderType { get; set; }
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

As you see, I add objects of type OrderResult to ConcurrentQueue. I shouldn't need to use a class OrderResult. Of course I could just add the order to the queue, and iterate throught them and calculate the sums after I am finished retrieving data. Is that what I should do? I simply want to handle the incoming orders, and simply count the different type of orders right away and store them in my 'aggregates collection'. Is that possible? If yes, how?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
RWC
  • 4,697
  • 2
  • 22
  • 29
  • 5
    Consider `BlockingCollection` as an alternative. It is a wrapper around `ConcurrentQueue`. – mjwills Jul 20 '18 at 13:13
  • You might also want to look into TPL's `Dataflow` library for this sort of thing. https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-a-producer-consumer-dataflow-pattern You mention 3 aggregates for 3 different order types, so the multiple producers and multiple consumer examples here could be of interest to you https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html – p3tch Jul 20 '18 at 13:17
  • Instead of introducing a `ConcurrentQueue` you could switch your `_aggregates` dictionary to a `ConcurrentDictionary`. The `Concurrent` collections allow multiple threads to manipulate the collection in a thread safe way without using a `lock` to synchronize access. – WBuck Jul 20 '18 at 13:26
  • Thanks guys. Really useful answers. I will add my findings after investigating the proposed solutions. – RWC Jul 20 '18 at 13:35
  • 1
    Your problem is typical of a class of problems that the 'producer consumer' design pattern is for. The BlockingCollection is a good option in c# for such problems, but search for producer consumer to see how to put that together. – Adam G Jul 20 '18 at 14:11
  • David Fowler [tweeted about this question](https://mobile.twitter.com/davidfowl/status/1020528070422171648) saying that the new System.Threading.Channels library was created to handle this kind of producer/consumer situation. – sgbj Jul 21 '18 at 06:45
  • Hangfire.IO is another useful OSS library that gives you lots of concurrency tooling if you modelled your problem as hangfire jobs – alastairtree Jul 21 '18 at 09:46
  • https://www.nuget.org/packages/System.Threading.Channels provides types for passing data between producers and consumers. – Shaun Luttin Jul 21 '18 at 15:12
  • Okay, is anybody able to change my example so it works with NuGet package "System.Threading.Channels" (https://www.nuget.org/packages/System.Threading.Channels). There is not much documentation. It should not take that much time... if you know what you are doing. ;-) – RWC Jul 23 '18 at 13:19

3 Answers3

1

As suggested by David Fowler himself, I tried to use the System.Threading.Channels to solve my problem and I was able to come up with something which seems to work correctly.

Library System.Threading.Channels is poorly documented, so I hope what I did is the way it is supposed to be done.

using System;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading;
using System.Collections.Generic;

namespace ConcurrentQueue
{
    class Program
    {
        //Buffer for writing. After the capacity has been reached, a read must take place because the channel is full.
        private static readonly int Capacity = 10; 

        //Number of orders to write by each writer. (Choose 0 for infinitive.)
        private static readonly int NumberOfOrdersToWrite = 25;

        //Delay in ms used 
        private static readonly int Delay = 50;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();

            MainAsync(args).GetAwaiter().GetResult();
        }

        static async Task MainAsync(string[] args)
        {
            var channel = Channel.CreateBounded<Order>(Capacity);

            var readerTask = Task.Run(() => ReadFromChannelAsync(channel.Reader));

            var writerTask01 = Task.Run(() => WriteToChannelAsync(channel.Writer, 1, NumberOfOrdersToWrite));
            var writerTask02 = Task.Run(() => WriteToChannelAsync(channel.Writer, 2, NumberOfOrdersToWrite));
            var writerTask03 = Task.Run(() => WriteToChannelAsync(channel.Writer, 3, NumberOfOrdersToWrite));
            var writerTask04 = Task.Run(() => WriteToChannelAsync(channel.Writer, 4, NumberOfOrdersToWrite));

            while (!writerTask01.IsCompleted || !writerTask02.IsCompleted || !writerTask03.IsCompleted || !writerTask04.IsCompleted)
            {
            }

            channel.Writer.Complete();

            await channel.Reader.Completion;

            ShowOutput();
        }

        public static async Task WriteToChannelAsync(ChannelWriter<Order> writer, int writerNumber, int numberOfOrdersToWrite = 0)
        {
            int i = 1;
            while (numberOfOrdersToWrite == 0 || i <= numberOfOrdersToWrite)
            {

                var order = CreateOrder(writerNumber, i);
                await writer.WriteAsync(order);
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: writer {writerNumber} just wrote order {order.OrderNumber} with value {order.OrderType}.");
                i++;
                //await Task.Delay(Delay);  //this simulates some work...
            }
        }

        private static async Task ReadFromChannelAsync(ChannelReader<Order> reader)
        {
            while (await reader.WaitToReadAsync())
            {
                while (reader.TryRead(out Order order))
                {
                    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: reader just read order {order.OrderNumber} with value {order.OrderType}.");
                    _aggregates[order.OrderType].Count++;
                    await Task.Delay(Delay);  //this simulates some work...
                }
            }
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int writerNumber, int seq)
        {
            string orderNumber = $"{writerNumber}-{seq}";
            var order = new Order() { OrderNumber = orderNumber, OrderType = GetRandomOrderType() };
            return order;
        }

        private static OrderTypeEnum GetRandomOrderType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;

            Console.WriteLine();
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            Console.WriteLine($"Total for all types: {total}");
            Console.WriteLine();
            Console.WriteLine("Done! Press a key to close the window.");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public string OrderNumber { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class Aggregator
    {
        public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
        {
             aggregates[order.OrderType].Count++;
        }
    }

    public class Aggregate
    {
        public OrderTypeEnum OrderType { get; set; }
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

I do not like the way I check the completion of the writers. How to improve this?

while (!writerTask01.IsCompleted || !writerTask02.IsCompleted ||
       !writerTask03.IsCompleted || !writerTask04.IsCompleted)
{
}

Any feedback is highly appreciated.

RWC
  • 4,697
  • 2
  • 22
  • 29
  • 2
    If you just want to wait until all those writers finish, then simply await those tasks. await Task.WhenAll(writerTask01, writerTask02, writerTask03, ...); Unlike Task.StartNew, Task.Run will unwrap the Task behind WriteToChannelAsync and return that, so you basically end up awaiting for each of those async writes to complete. – swiftest Jan 24 '19 at 18:12
0

You second solution using the ConcurrentQueue<T> isn't actually doing the aggregation concurrently. It's only adding items to the queue concurrently and then processing the queue sequentially. For this specific example code, the simplest solution would be to use the first solution you came up with, except with a lock around the increment in Aggregator.Aggregate method, like this:

public class Aggregator
{
    public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
    {
        var aggregate = aggregates[order.OrderType];
        Interlocked.Increment(ref aggregate.Count);
    }
}
Shayne
  • 195
  • 1
  • 10
  • I tried your example. It does not compile. "A property or indexer may not be passed as an out of ref parameter." Any suggestions? – RWC Jul 23 '18 at 13:38
  • I've edited the code to add another line which first assigns a variable before incrementing. Does that work? – Shayne Jul 24 '18 at 00:25
  • You could also just use a lock statement if Interlocked is giving you too much trouble. – Shayne Jul 24 '18 at 00:29
  • I tried the same and it does not work. Also changing it to var count = aggregates[order.OrderType].Count; Interlocked.Increment(ref count); does not work. – RWC Jul 24 '18 at 11:43
0

As suggested by Shayne, using a lock statement (in my first code example) does work:

public class Aggregator
{
    private static readonly Object _lockObj = new Object();

    public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
    {
        lock (_lockObj)
        {
            aggregates[order.OrderType].Count++;
        }
    }
}

I think DataFlow and System.Threading.Channels are more flexible and more elegant solutions.

RWC
  • 4,697
  • 2
  • 22
  • 29