0

I have a consumer service retrieving messages from a RabbitMQ queue using EasyNetQ subscriber. Each message takes tens of seconds to process, and I need to run them in parallel to ensure I can keep up with the producer. However, each message has a property, call it groupingId. It's important that tasks with the same groupingId are not executed concurrently, as this causes resource collisions.

Its likely that there are many hundreds of groupingIds, and in usual practice not too many messages at any one time having the same Id. However the data can be bursty leading to clusters of hundreds of the same Id happening at one time.

I thought maybe TPL Dataflow might be a good fit, but I'm not that familiar with it, and not sure how to achieve what I need with it. Any guidance would be appreciated.

notbono
  • 63
  • 1
  • 5
  • 1
    One idea could be to use a keyed asynchronous locker, basically a dictionary of `SemaphoreSlim`s. You can find some implementations here: [Asynchronous locking based on a key](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key) – Theodor Zoulias May 14 '21 at 17:37
  • 1
    And here is a Polly/TPL Dataflow approach: [Send parallel requests but only one per host](https://stackoverflow.com/questions/57022754/send-parallel-requests-but-only-one-per-host-with-httpclient-and-polly-to-gracef). – Theodor Zoulias May 14 '21 at 17:50

1 Answers1

0

Create a dictionary of grouping IDs and lock on them.

First, create the dictionary somewhere, probably as a member variable.

ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();

When you need to process a message, use this logic.

if (!_locks.ContainsKey(message.GroupingID))
{
    _locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
    ProcessMessage(message);
}
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • That does prevent the processing of identical groupingId messages. However just discarding messages is not really an option. My first approach was a dictionary of BlockingCollection objects. But it seemed very unwieldy. – notbono May 14 '21 at 16:57
  • 1
    Not sure why "discarding" comes up in your comment-- there is nothing in my code that discards anything. – John Wu May 14 '21 at 17:38
  • I apologize. You're right, I wasn't reading your code properly. I think that solution could actually work for me, and it's a lot simpler than anything else. The only concern I have is about the number of locks and the number of blocked threads I might end up with, but the only way to figure that out is by testing it. – notbono May 14 '21 at 20:28
  • 1
    Yes, it is a bit of a brute force solution that will result in a lot of locking and blocking. Another way to do it would be to establish 10 threads and 10 blocking collections, and assign messages to each collection based on the last digit in their group ID-- that sort of thing. That would probably perform better, but depends on the implementation details of your messaging system. – John Wu May 14 '21 at 21:11