4
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
using namespace std;

const int FLAG1 = 1, FLAG2 = 2, FLAG3 = 3;
int res = 0;
atomic<int> flagger;

void func1() 
{
    for (int i=1; i<=1000000; i++) {
        while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
        res++; // maybe a bunch of other code here that don't modify flagger
        // code here must not be moved outside the load/store (like mutex lock/unlock)
        flagger.store(FLAG2, std::memory_order_relaxed);
    }
    cout << "Func1 finished\n";
}

void func2() 
{
    for (int i=1; i<=1000000; i++) {
        while (flagger.load(std::memory_order_relaxed) != FLAG2) {}
        res++; // same
        flagger.store(FLAG3, std::memory_order_relaxed);
    }
    cout << "Func2 finished\n";
}

void func3() {
    for (int i=1; i<=1000000; i++) {
        while (flagger.load(std::memory_order_relaxed) != FLAG3) {}
        res++; // same
        flagger.store(FLAG1, std::memory_order_relaxed);
    }
    cout << "Func3 finished\n";
}

int main()
{
    flagger = FLAG1;
    
    std::thread first(func1);
    std::thread second(func2);
    std::thread third(func3);
    
    first.join();
    second.join();
    third.join();
    
    cout << "res = " << res << "\n";
    return 0;
}

My program has a segment that is similar to this example. Basically, there are 3 threads: inputer, processor, and outputer. I found that busy wait using atomic is faster than putting threads to sleep using condition variable, such as:

std::mutex commonMtx;
std::condition_variable waiter1, waiter2, waiter3;
// then in func1()
unique_lock<std::mutex> uniquer1(commonMtx);
while (flagger != FLAG1) waiter1.wait(uniquer1);

However, is the code in the example safe? When I run it gives correct results (-std=c++17 -O3 flag). However, I don't know whether the compiler can reorder my instructions to outside the atomic check/set block, especially with std::memory_order_relaxed. In case it's unsafe, then is there any way to make it safe while being faster than mutex?

Edit: it's guaranteed that the number of thread is < number of CPU cores

Huy Le
  • 1,439
  • 4
  • 19
  • I'm still learning about the memory orders, but this looks like UB to me. I think you should be using acquire/release (or some `atomic_thread_fence`s?). – HolyBlackCat Dec 31 '21 at 13:15
  • In your example, multi threaded implementation is useless. You want to ensure three operations are done in a sequence (1, 2, 3) and only one thread is working at a time. Why don't you just use one thread which runs all those operations in sequence? – VLL Dec 31 '21 at 13:29
  • 1
    `res++` isn't ordered with respect to relaxed atomic stores. Your code is unsafe. Use acquire/release memory ordering. See Herb's excellent [atomic<> weapons talk](https://www.youtube.com/watch?v=A8eCGOqgvH4) for more details. – rustyx Dec 31 '21 at 13:32
  • If you have "inputer, processor, and outputer", you could have two resources: Inputer thread writes to input queue (`std::vector`?), processor thread reads from there and writes to output queue, output thread reads from output queue. In the processing thread, lock mutex, `std::move` the entire queue to a local variable (this should be quick), unlock mutex, then start processing data until you need more. In the output thread, do the same. This way none of the threads have to stop to wait for the following thread to process the data. – VLL Dec 31 '21 at 13:47
  • @vll actually i'm doing exactly that. I have N x 2 resource queue. At each time, each thread only need to lock 1 queue, while other threads can work on other N-1 queue if they have task. I didn't put that in to make the example simple – Huy Le Dec 31 '21 at 14:05
  • 1
    Tried this with Clang's `-fsanitize=thread`, and it says that it's a data race (and UB). If I use 'acquire' loads and 'release' stores, it stops complaining. I then expected to be able to replace the 'acquire' load with a 'relaxed' load followed by an 'acquire' fence, which didn't work for some reason. I asked a new question about it [here](https://stackoverflow.com/q/70542993/2752075). – HolyBlackCat Dec 31 '21 at 14:06
  • 1
    The informal explanation is that `relaxed` atomic operations can be freely reordered, either at the code generation or machine level, with unrelated loads and stores. So for instance, in `func1`, the machine could load the value of `res` before the test of `flagger`, and only store back the incremented value after getting the correct value loaded from `flagger`. But some other thread could increment `res` in between, which would then be lost when `func1` stores back a stale value. – Nate Eldredge Dec 31 '21 at 17:22
  • 1
    Reasoning like this is not a substitute for understanding the formal rules, but it does help you get a sense of why the formal rules exist. – Nate Eldredge Dec 31 '21 at 17:23
  • 1
    By the way, even after fixing the memory ordering, your approach is not great for a typical program. You have created a version of a spinlock, and the problem is that they don't cooperate with the OS's scheduling. If a thread gets scheduled out while holding the lock, then the other threads spin at 100% CPU until the first one comes back and unlocks, which slows down the rest of the system and wastes energy. – Nate Eldredge Dec 31 '21 at 17:25
  • @NateEldredge is there any way to prevent threads getting scheduled out? All these loops have < 50ns execution time per loop, so my solution needs to have consistently low latency – Huy Le Jan 01 '22 at 02:30
  • @HuyLe: Depends on your OS, which you have not identified. Some have a provision for real-time scheduling, giving you more control over how your threads are scheduled. But it's not something to do lightly or without a thorough understanding of the consequences. For instance, it means that a deadlock in your code may now hang the entire system instead of just your process; you won't be able to log in to fix it, and will have to reboot the hardware. – Nate Eldredge Jan 01 '22 at 08:36

2 Answers2

4

std::memory_order_relaxed results in no guarantees on the ordering of memory operations except on the atomic itself.

All your res++; operations therefore are data races and your program has undefined behavior.

Example:

#include<atomic>

int x;
std::atomic<int> a{0};

void f() {
    x = 1;
    a.store(1, std::memory_order_relaxed);
    x = 2;
}

Clang 13 on x86_64 with -O2 compiles this function to

    mov     dword ptr [rip + a], 1
    mov     dword ptr [rip + x], 2
    ret

(https://godbolt.org/z/hxjYeG5dv)

Even on a cache-coherent platform, between the first and second mov, another thread can observe a set to 1, but x not set to 1.

You must use memory_order_release and memory_order_acquire (or sequential consistency) to replace a mutex.

(I should add that I have not checked your code in detail, so I can't say that simply replacing the memory order is sufficient in your specific case.)

user17732522
  • 53,019
  • 2
  • 56
  • 105
  • How should I change the code in the question? .load(acquire) and .store(release) ? What about .load(acquire) and .store(relaxed) ? – Huy Le Dec 31 '21 at 14:07
  • 1
    @HuyLe You should replace all stores with `memory_order_release` and all loads with `memory_order_acquire`. `memory_order_relaxed` is not enough in either case. I could have given you a similar example with load reordering on `memory_order_relaxed`. If you use release/acquire everywhere, then I _think_ at least _the specific code in your question_ is ok. Whether this still applies if you fill out your comments with other stuff, I don't know. – user17732522 Dec 31 '21 at 14:11
2

As mentioned in the other answer, res++; in different threads are not synchronized with each other, and cause a data race and undefined behavior. This can be checked with a thread sanitizer.

To fix this, you need to use memory_order_acquire to loads and memory_order_release for stores. The fix can too be confirmed with a thread sanitizer.

while (flagger.load(std::memory_order_acquire) != FLAG1) {}
res++;
flagger.store(FLAG2, std::memory_order_release);

Or, flagger.load(std::memory_order_acquire) can be replaced with flagger.load(std::memory_order_relaxed), followed by std::atomic_thread_fence(std::memory_order_acquire); after the loop.

while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
std::atomic_thread_fence(std::memory_order_acquire);
res++;
flagger.store(FLAG2, std::memory_order_release);

I'm not sure how much it improves performance, if at all.

The idea is that only the last load in the loop needs to be an acquire operation, which is what the fence achieves.

HolyBlackCat
  • 78,603
  • 9
  • 131
  • 207