-2

Brief: In the Following code, main function creates 8 threads , producer fills the payload array one at a time ,waits for that particular consumer to read it and then moves on to the next index. The waiting mechanism is , waiting for the global value to be cleared by consumer.

//Producer
volatile int Payload[8];//global
CreateThread(); // 8 threads.
while(1)
{
    for(int i = 0; i < 8;i++)
    { 
        Payload[i] = 10;
        while(Payload[i]==10); //this is done to wait till thread reads and clears it.
    }
}

//Consumer
while(1)
{
    if(Payload[threadindex])
    {
        x = Payload[threadindex];
        Payload[threadindex] = 0;
        cout << "print" << endl;
    }
}

This does not work. Somehow I get stuck at

while(Payload[i]==0);

I think the issue is the following.

Even if I have kept Payload as volatile, program is still reading it from the cache and hence getting the stale value.

How can I force the threads to uncache the values?

How do I in general solve this problem? I am relatively new to Multi-threading.

Update: I see stackoverflow suggesting the following link.

Does guarding a variable with a pthread mutex guarantee it's also not cached?

Which mentions pthread functions doing memory barriers to ensure cache effects are made visible to other threads.

Do we have anything on windows?

Anup Buchke
  • 5,210
  • 5
  • 22
  • 38
  • You should post some real code, this pseudocode doesn't tell the whole story. For example, I have no idea what "while (Payload[i]==0); is supposed to tell me. – Ben Aug 14 '19 at 05:23
  • Updated my post. – Anup Buchke Aug 14 '19 at 05:25
  • Updated the tag. – Anup Buchke Aug 14 '19 at 05:27
  • 2
    The volatile ints are "uncached". You can still have race conditions. But this is not a [mre], and a volatile int flag is not a proper way to notify a thread. – Antti Haapala -- Слава Україні Aug 14 '19 at 05:28
  • I thought volatile only tries to guarantee non-register storages. But things can be cached. Right? – Anup Buchke Aug 14 '19 at 05:30
  • `volatile` is rather the right qualifier for e.g. H/W registers mapped into mem. address range. Either use atomics or lock accesses by muteces for interthread communication. (Constant data, created before threads doesn't need to be locked as thread creation should be a sufficient sync.) – Scheff's Cat Aug 14 '19 at 05:33
  • Scheff, thanks for your comment. I did try using Interlock* commands for updates, but still see this issue. And when I kill the app and reopen, it works fine. Any BKMs for message passing between threads? – Anup Buchke Aug 14 '19 at 05:37
  • @AnupBuchke Read the C++ standard section on `volatile` and you will see that absolutely no threading semantics are guaranteed at all. You need to use something that has guaranteed threading semantics unless you're happy with code that just happens to work when you tried it. – David Schwartz Aug 14 '19 at 06:01

1 Answers1

1

The conventional way of implementing a producer/consumer model is with a std::mutex and a std::condition_variable:

#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>

int main()
{
    const size_t threadCount = 8;
    int payload[threadCount]{};
    std::condition_variable condition;
    std::mutex mutex;
    std::vector<std::thread> threads;
    for (size_t i = 0; i < threadCount; i++)
    {
        threads.emplace_back([&, i]
        {
            while (true)
            {
                std::unique_lock lock(mutex);
                // wait until payload is non-zero
                condition.wait(lock, [&] { return payload[i] != 0; });
                int value = payload[i];
                payload[i] = 0;
                // tell the producer we have cleared the value
                condition.notify_all();
                // unlock mutex whilst we do our work to allow other threads/producer to keep running
                lock.unlock();
                std::cout << value << "\n";
            }
        });
    }
    while (true)
    {
        for (size_t i = 0; i < threadCount; i++)
        {
            std::unique_lock lock(mutex);
            // wait until payload is zero
            condition.wait(lock, [&] { return payload[i] == 0; });
            payload[i] = i;
            // tell the consumers we have set the value
            condition.notify_all();
        }
    }
}

This properly synchronises the data between threads and threads with no work are idle rather than using 100% CPU continuously checking the value for some work to do.

Note the payload array does not need to be volatile.

Alan Birtles
  • 32,622
  • 4
  • 31
  • 60