-1

I am writing a provider-consumer program. There are many message queues, and I want to make sure that every message enqueued in each message queue has a sequential, increasing index, for example, <0, "msga">, <1, "msgb">, .... The order remains inside each queue, and different queues are irrelevant.

I use 2 hash maps and 2 mutex locks to do this(I want to use only one lock, but failed).

(i) The first hash map maps queue name to a pair <int, mutex *>, in which int means the counter of each queue, mutex protects the queue in concurrent requests. The definition is

using umap_cnt = unordered_map<string, pair<int, mutex* > >;
umap_cnt counters;

(ii) The second hash map maps queue name to actual message <int, MSG>(MSG is actually string), the definition is

using umap_msg = unordered_map<string, vector<pair<int, msg> > >;
umap_msg messages`;

My logic is

  • find the mutex of a queue in counters, lock() it
  • use counter_iter->first as the index of a message, put() it in messages
  • increase the index
  • unlock() the mutex lock

The sample code is as follows

/* File A.cpp */
using umap_msg = unordered_map<string, vector<pair<int, msg> > >;
mutex mtx;
umap_msg messages;
void put(string kname, MSG msg, int index) {
    // If not using mtx here, the message would be out of order.
    // lock_guard<mutex> mtx;
    messages[kname].push_back(make_pair<index, msg>);
}

// Many threads runs producer()
/* File B.cpp*/
using umap_cnt = unordered_map<string, pair<int, mutex* > >
void producer(string kname, MSG msg) {
    umap_cnt::iterator iter = counters.find(kname);
    if (iter == counters.end())
        return;
    // lock the queue
    iter->second.second->lock();
    put(kname, msg, iter->second.first);
    ++iter->second.first; // increase the counter
    iter->second.second->unlock();
}

The problem is that I find a single lock does not work. I have to use 2 mutex locks, one in producer and the other in put, otherwise I get either out of order message or segmentation fault(no idea why).

My original purpose is to use only one lock in producer. Each queue has its own mutex, and no two threads can get the lock of a queue at the same time, so every message in each queue should be ordered. However, the result is not what I expected.

Why is the extra lock in line 7 necessary? If I do have to use 2 locks, is there anyway that I can optimize it because 2 locks seems a little heavy?

Zihan
  • 405
  • 3
  • 13
  • 1
    The tangled description is difficult to follow. You need to edit your question and make it meet all requirements for a [mcve] as explained in stackoverflow.com's [help], so that anyone can reproduce your problem, themselves. Random code fragments, that may or may not have anything to do with the issue at hand, are insufficient. – Sam Varshavchik Jul 01 '18 at 15:38
  • 1
    And remove all those line numbers - they make it impossible for us tio compile your code. –  Jul 01 '18 at 15:45
  • You really ought to use a `lock_guard` in `producer()` instead of directly calling `lock()` and `unlock()` – Ben Voigt Jul 01 '18 at 16:17
  • @BenVoigt Thanks for the tip, but could you elaborate? I just did a quick search and found lock_guard is a local variable and gets unlocked when it is deconstructed. What if I have more lines after unlock in producer, will lock_guard lock those lines too? – Zihan Jul 01 '18 at 16:42
  • Yes, it unlocks the synchronization object when the guard's destructor runs. Remember that the lifetime of a local variable is not necessarily the entire function, but the containing scope. In your example, the guard would be destroyed at the closing brace of the function, but you can always introduce a new block in order to force it earlier. The advantage is that the guard will free the lock during abnormal control transfers as well as normal exit. – Ben Voigt Jul 01 '18 at 16:46
  • @BenVoigt Now I see their differences, thanks! – Zihan Jul 01 '18 at 16:53

1 Answers1

0

I'm assuming that the unordered_map named messages starts empty (btw, am I assuming right or is the code not showing this part?), then at line 8:

messages[kname].push_back(make_pair<index, msg>);

we write to messages using the [] operator which performs an insertion if the key doesn't already exist.

And writing concurrently to an unordered_map is not supported (see this question), therefore a mutex is needed to make these writes sequential.

Olivier Sohn
  • 1,292
  • 8
  • 18
  • You are right, I forgot about the implicit insertion by `[]`. Maybe I should have pre-insert some empty vectors to get a fixed keyset. Thanks! – Zihan Jul 01 '18 at 16:32