I have orders coming in from multiple threads and I want to process this data in one thread. If I understood it right, the way to do it is with ConcurrentQueue.
I had a look at SO question How to work threading with ConcurrentQueue<T>, but it did not answer my questions.
I wrote a small test application (with .NET Core 2.1) to see if I could get it to work.
This is what it should do: Make aggregates for 100 orders. There are 3 aggregates for 3 different order types: Type1, Type2 and Type3
The output should be something like:
Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100
I started of writing the application without ConcurrentQueue. As exepected, the results in _aggregates are wrong.
/* Incorrect version, not using ConcurrentQueue, showing incorrect results */
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
MainAsync(args).GetAwaiter().GetResult();
}
static async Task MainAsync(string[] args)
{
await Task.Run(() => ProcessOrders());
ShowOutput();
}
public static void ProcessOrders()
{
var aggregator = new Aggregator();
Parallel.ForEach(_orders, order => {
aggregator.Aggregate(order, _aggregates);
});
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class Aggregator
{
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
aggregates[order.OrderType].Count++;
}
}
public class Aggregate
{
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
So I rewrote the application using ConcurrentQueue. The results are correct now, but I have got the feeling I am doing it wrong or it can be done more efficiently.
/* improved version using ConcurrentQueue, showing correct results */
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
var proxy = new OrderProxy();
var ordersQueue = new ConcurrentQueue<OrderResult>();
Parallel.ForEach(_orders, order => {
var orderResult = proxy.PlaceOrder(order);
ordersQueue.Enqueue(orderResult);
});
foreach (var order in ordersQueue)
{
_aggregates[order.OrderType].Count++;
}
ShowOutput();
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum AggregateType { get; set; }
}
public class OrderResult
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class OrderProxy
{
public OrderResult PlaceOrder(Order order)
{
var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
return orderResult;
}
}
public class Aggregate
{
public OrderTypeEnum OrderType { get; set; }
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
As you see, I add objects of type OrderResult to ConcurrentQueue. I shouldn't need to use a class OrderResult. Of course I could just add the order to the queue, and iterate throught them and calculate the sums after I am finished retrieving data. Is that what I should do? I simply want to handle the incoming orders, and simply count the different type of orders right away and store them in my 'aggregates collection'. Is that possible? If yes, how?