I am writing a provider-consumer program. There are many message queues, and I want to make sure that every message enqueued in each message queue has a sequential, increasing index, for example, <0, "msga">, <1, "msgb">, ...
. The order remains inside each queue, and different queues are irrelevant.
I use 2 hash maps and 2 mutex locks to do this(I want to use only one lock, but failed).
(i) The first hash map maps queue name to a pair <int, mutex *>
, in which int
means the counter of each queue, mutex protects the queue in concurrent requests. The definition is
using umap_cnt = unordered_map<string, pair<int, mutex* > >;
umap_cnt counters;
(ii) The second hash map maps queue name to actual message <int, MSG>
(MSG is actually string), the definition is
using umap_msg = unordered_map<string, vector<pair<int, msg> > >;
umap_msg messages`;
My logic is
- find the mutex of a queue in
counters
, lock() it - use counter_iter->first as the index of a message, put() it in
messages
- increase the index
- unlock() the mutex lock
The sample code is as follows
/* File A.cpp */
using umap_msg = unordered_map<string, vector<pair<int, msg> > >;
mutex mtx;
umap_msg messages;
void put(string kname, MSG msg, int index) {
// If not using mtx here, the message would be out of order.
// lock_guard<mutex> mtx;
messages[kname].push_back(make_pair<index, msg>);
}
// Many threads runs producer()
/* File B.cpp*/
using umap_cnt = unordered_map<string, pair<int, mutex* > >
void producer(string kname, MSG msg) {
umap_cnt::iterator iter = counters.find(kname);
if (iter == counters.end())
return;
// lock the queue
iter->second.second->lock();
put(kname, msg, iter->second.first);
++iter->second.first; // increase the counter
iter->second.second->unlock();
}
The problem is that I find a single lock does not work. I have to use 2 mutex locks, one in producer and the other in put, otherwise I get either out of order message or segmentation fault(no idea why).
My original purpose is to use only one lock in producer. Each queue has its own mutex, and no two threads can get the lock of a queue at the same time, so every message in each queue should be ordered. However, the result is not what I expected.
Why is the extra lock in line 7 necessary? If I do have to use 2 locks, is there anyway that I can optimize it because 2 locks seems a little heavy?