1

I'm working on practicing C++ concurrency and learning about the practical applications of using mutexes vs lock free atomics. I was working on the following problem from Leetcode: https://leetcode.com/problems/building-h2o/description/

Basically, different threads are responsible for either releasing a hydrogen atom or oxygen atom and you want to synchronize so that every consecutive set of 3 atoms consists of 2 hydrogen and 1 oxygen.

I implemented the solution in 2 ways, one using a mutex and one using atomics.

Mutex Solution:

class H2O {
    mutex m;
    condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(function<void()> releaseHydrogen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(function<void()> releaseOxygen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

Atomics Solution:

class H2O {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen atoms acquires and the second digit is the number of oxygen atoms acquired */
    atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and resets back to 3*/
    atomic<int> completedCount_{0};

public:
    H2O() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the acquired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

The code using mutexes is much more simple and also runs around ~20 times faster than the code using atomics on average when I submit to Leetcode. I'm trying to better understand when to use locks/mutexes vs when to prefer atomics/lock-free. I have the following questions:

  1. In this case, I don't know how the Leetcode server is actually running threads to execute the tests and how many processors/cores it has available. My understanding is that with atomics, you should get better throughput since less threads are "waiting" to get a lock. However I'm guessing in this problem since there can only be consecutive sets of 2 hydrogens and 1 oxygen being released, if there are many threads being run, then only up to 3 of them can be concurrently releasing the respective atoms. Is there an example of when the atomic solution to this problem might be expected to be more performant/faster than the mutex-based one? Or is this an example of a problem where you would expect mutexes to work better to begin with generally speaking?

  2. Is there a way to more efficiently write a solution using atomics? Maybe some of the while loops and CAS operations are not needed or can be structured differently?

  3. I also tried specifying memory order for the atomic solution where I made reads -> memory_order_acquire, writes -> memory_order_released, and rmw -> memory_order_acq_rel. When I submitted the code a bunch of times, it seems like relaxing the memory order made the code on average around 1-2 times faster. In general, when writing code using atomics, can you typically specify memory order as above? How do you decide whether you need true sequential consistency between all atomic operations vs relaxing the semantics depending on whether it's a read, write, or rmw?

I know this is a long post but would really appreciate any thoughts!

bot654321
  • 39
  • 4
  • Digits of the decimal representation aren't the most efficient way to pack 2 numbers into one binary `int`. I'd have used 2-bit or 4-bit groups, like `curState >> 4` and `curState & 0xf`. 4 is convenient since debuggers will easily use hex digits like 0x20 or 0x21. This is BCD, although the highest digit you're using is only 2 so it's not full-range decimal. – Peter Cordes May 22 '23 at 03:38
  • This seems weird. Why can't we have a situation with say 6 free hydrogens waiting for an oxygen to react with? If the atoms always come in balanced groups, why do we need separate threads to produce them? They'd already have to synchronize with each other to make sure they never ended up with more than 2 H or more than 1 O not yet reacted. Or is the idea that this code is implementing that synchronization and barriering, and calling `hydrogen()` blocks if there's already 2 H not reacted? So an alternate design could maybe use a counter that can go down to -2 (one O) or up to +2 (2 H). – Peter Cordes May 22 '23 at 03:45
  • Btw, you are using `SeqCst` memory ordering which incurs heavy synchonization overhead for CPU caches. For this task, I recommend relaxed atomics. – Angelicos Phosphoros May 28 '23 at 17:11

1 Answers1

2

I don't think you can make any performance determinations based on the competition testing without seeing the test harness. Those results seem to be all over the place.

I made a simple test harness that simply spews hydrogens and oxygens from two different threads in the correct proportion. My goal was to isolate the performance of the synchronization code versus the thread overhead.

As expected, the std::atomic was much faster than std::mutex for this problem (732us versus 16ms). My intuition from experience is that std::atomic is likely to be faster (lower latency) while std::mutex is likely to consume less power (efficient sleeping). The usual caveat applies in that you really just have to measure the performance on for your use-case, OS and hardware.

The one thing I would suggest for your std::atomic code is to throw in std::this_thread::yield() in the while loops so the thread will release its time slice if it cannot move forward. This will reduce thread contention if there a many threads trying to grab the resource.

Sample Code

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class H2O {
    std::mutex m;
    std::condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(std::function<void()> releaseHydrogen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

class H2OAtomic {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen\
 atoms acquires and the second digit is the number of oxygen atoms acquired */
    std::atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and rese\
ts back to 3*/
    std::atomic<int> completedCount_{0};

public:
    H2OAtomic() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the ac\
quired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set\
 of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(std::function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

template<class Clock = std::chrono::high_resolution_clock>
class StopWatch
{
public:
    StopWatch()
        : start_tp_(Clock::now())
        , last_tp_(start_tp_)
    { }

    auto now() const
    {
        std::atomic_thread_fence(std::memory_order_relaxed);
        auto current_tp = Clock::now();
        std::atomic_thread_fence(std::memory_order_relaxed);
        return current_tp;
    }

    auto mark()
    {
        auto current_tp = now();
        auto elapsed = current_tp - last_tp_;
        last_tp_ = current_tp;
        return elapsed;
    }

    template<class Units = typename Clock::duration>
    auto elapsed_duration()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed);
    }

    template<class Units = typename Clock::duration>
    auto elapsed_time()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed).count();
    }

private:
    typename Clock::time_point start_tp_;
    typename Clock::time_point last_tp_;
};

using std::cout, std::endl;

void release_hydrogen() {
    // cout << "H";
}

void release_oxygen() {
    // cout << "O";
}

template<class Builder, class T, class U>
void build_water(int n, T&& h, U&& o) {
    Builder builder;
    auto h0th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto h1th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto oth = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.oxygen(o);
    });

    h0th.join();
    h1th.join();
    oth.join();
}

template<class Work>
void measure(std::string_view desc, Work&& work) {
    StopWatch timer;
    timer.mark();
    work();
    auto n = timer.elapsed_duration<std::chrono::microseconds>().count();
    cout << desc << " : " << n << endl;
}

int main(int argc, const char *argv[]) {
    measure("mutex", [&]() {
        build_water<H2O>(3000, release_hydrogen, release_oxygen);
    });
    measure("atomic", [&]() {
        build_water<H2OAtomic>(3000, release_hydrogen, release_oxygen);
    });

    return 0;
}

Output

mutex : 16447
atomic : 732
RandomBits
  • 4,194
  • 1
  • 17
  • 30
  • This is really not a fair comparison because your mutex code is terrible. Among other things, it suffers from a thundering herd problem and constantly wakes threads that cannot make forward progress. – David Schwartz May 21 '23 at 21:52
  • @DavidSchwartz Maybe this is what you meant, but the mutex and atomic code is from OP's question. I just contributed the test harness. The test harness only uses two threads, so I'm not sure how the thundering herd comes into play. – RandomBits May 21 '23 at 21:55
  • Thank you this is helpful! Some quick questions regarding your implementation, in the timer class method now(), why do we need the 2 relaxed fences around getting Clock::now()? – bot654321 May 21 '23 at 23:02
  • Also another question that is a bit unrelated, is there a reason for using type deduction with universal references for templated arguments that are functions (eg. T&& h, U&& o in build_water and Work&& work in measure)? – bot654321 May 21 '23 at 23:08
  • 1
    @bot654321 The goal of the fences is to keep the compiler from reordering loads and stores across the call to now. This forces any results to be committed to memory before you call now and doesn't allow a head start on a load that should be after now. – RandomBits May 21 '23 at 23:23
  • 1
    @bot654321 With regards to the universal reference, it is the most versatile option -- see [this](https://stackoverflow.com/a/65563714/1202808) answer. In this particular code, I could have accepted the argument by value and it would not have mattered. – RandomBits May 21 '23 at 23:30
  • A `relaxed` fence is meaningless. If you want to prevent compile-time reordering of stores and loads without running extra barrier instructions to force run-time ordering, you could use an atomic *signal* (not thread) fence, like `std::atomic_signal_fence(std::memory_order_seq_cst)`. But typically you don't need to; `std::chrono::high_resolution_clock::now()` can't usually fully inline, so the compiler has to assume it might read or write any data that might be reachable via pointers it doesn't know about, including any shared variables. – Peter Cordes May 22 '23 at 03:29
  • @RandomBits By using a single condition variable and using `notify_all`, you needlessly wake lots of threads that cannot make forward progress. This is called the "thundering herd" problem. – David Schwartz May 22 '23 at 18:21
  • @DavidSchwartz Yes, you could use separate condition variables if you added some conditional logic because `oxygen` should always signal `hydrogen` and vice-versa, but `hydrogen` also needs to signal itself half of the time. Clearly better if there are more than a few threads. In the posted code, however, there are only two threads and one is being signaled by the other in every instance. Not much thunder and I would say a fair comparison for this particular test case as the mutex solution is not disadvantaged because of it. – RandomBits May 22 '23 at 19:46