1

I have built a MQTT client that listens for certain status data. For each message I run a method which can take a while (up to 1 second). Since a lot of messages can arrive at once, I want to run the whole thing in parallel. My problem now is, when I receive a message belonging to topic A, I want to make sure that the previous task belonging to topic A has already finished before I start the new one. But I also need to be able to receive new messages during the time I am waiting for Task A to finish and add them to the queue if necessary. Of course, if the new message belongs to topic B, I don't care about the status of task A and I can run this method call in parallel.

In my mind, this is solved with a kind of dictionary that has different queues.

Max
  • 67
  • 8
  • 2
    Your approach sounds correct. Do you have a question about it? – Kisar Feb 09 '23 at 13:11
  • I don't know how to code it – Max Feb 09 '23 at 13:50
  • Related: [Dynamically processing a concurrent collection in parallel by group but serially within each group](https://stackoverflow.com/questions/71000722/dynamically-processing-a-concurrent-collection-in-parallel-by-group-but-serially). If you use a [`Channel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1) as a queue for the messages, then that question might have a suitable answer to your problem. – Theodor Zoulias Feb 09 '23 at 20:09

2 Answers2

1

What about to use a lock on an object related to the topic? When a new item come in the system you could retrieve/create a lock object from a ConcurrentDictionary and then you could use this object to lock the execution. something like this.

static ConcurrentDictionary<string,object> _locksByCategory = 
    new ConcurrentDictionary<string,object>();

async void ProcessItem(ItemType item) {
    var lockObject = _locksByCategory(item.Category, new object(), (k, o) => o);
    lock (lockObject) {
        // your code
    }
}

This isn't a production ready solution but could help to start with.

1

I don't know exactly how you would do it, but it goes along the lines of:

  • On startup, create a (static? singleton?) Dictionary<Topic, ConcurrentQueue> and for each topic create a thread that does the following:
    1. Wrap the ConcurrentQueue in a BlockingCollection
    2. infinitely loop with BlockingCollection.Take at the start of the loop. This should block until an item is ready, execute the rest of the loop and listen for more items afterwards.
  • Whenever a message comes in, add it to the corresponding ConcurrentQueue.
Kisar
  • 343
  • 1
  • 11