0

I am implementing the producer-consumer problem. To implement this, we need to have std::condition_variable along with std::mutex to notify threads to wake up. Using these 2 primitives, the producer can notify to consumer and vice-versa to wake up. This is generally required to avoid thread starvation issues. But I am thinking does this issue really persist in the case of the multiprocessors system?

This question comes because I am implementing this using lock-free ring buffer and I don't want to use std::mutex and std::condition_variable at the producer and consumer sides. Since this queue can't have a data-race issue calling enqueue() and dequeue(). Below is the code.

template<typename MessageType>
class MessageProcessor
{
public:
    ~MessageProcessor()
    {
        stop();

        if (workerThread_.joinable())
            workerThread_.join();
    }

    bool postMessage(MessageType const &msg)
    {
        return queue_.enqueue(msg);
    }

    void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
    {
        std::call_once(init_, [&](){
            handler_ = std::move(handler);
            workerThread_ = std::thread{&MessageProcessor::process, this};

            if (!setAffinity(coreId, workerThread_))
                LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
            else
                LOG("Msg Processing thread pinned to core: " << coreId);

            if (! name.empty())
                pthread_setname_np(workerThread_.native_handle(), name.data());
        });
    }

    void stop()
    {
        stop_ = true;
    }

private:
    void process() //This is a consumer, runs in a separate thread
    {
        while(!stop_.load(std::memory_order_acquire))
        {
            MessageType msg;

            if (! queue_.dequeue(msg))
                continue;

            try
            {
                handler_(msg);
            }
            catch(std::exception const &ex)
            {
                LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
            }
            catch(...)
            {
                LOG("UNKOWN Error while processing data: " << msg);
            }
        }
    }

    bool setAffinity(int32_t const coreId, std::thread &thread)
    {
        int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);

        if (coreId < 0 || coreId >= cpuCoreCount)
            return false;

        cpu_set_t cpuset;

        CPU_ZERO(&cpuset);
        CPU_SET(coreId, &cpuset);

        pthread_t currentThread = thread.native_handle();

        return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
    }

    std::thread workerThread_;
    std::atomic<bool> stop_{false};
    MPMC_Circular_Queue<MessageType, 1024> queue_;
    std::function<void(MessageType)> handler_{};
    std::once_flag init_;
};

int main()
{
    pthread_setname_np(pthread_self(), "MAIN");

    MessageProcessor<int> processor;

    processor.registerHandler([](int i){
        LOG("Received value: " << i);
    }, 2, "PROCESSOR");

    std::thread t1([&]() { //Producer thread1
        for (int i = 1; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t1.native_handle(), "ODD ");

    std::thread t2([&]() { //Producer thread2
        for (int i = 2; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t2.native_handle(), "EVEN");

    for (int i = 1; i <= 100000; ++i)
    {
        LOG("Runing main thread: " << i);
    }

    t1.join();
    t2.join();

    return 0;
}

Can this code raise thread starvation issue in modern multiprocessors system? MPMC_Circular_Queue is a lock free bounded queue.

  • Define "modern multiprocessor systems". That can mean anything from a dual core two thread processor to a 128 core 256 thread per socket system with multiple sockets (or even more). – Jesper Juhl Jun 13 '22 at 18:53
  • 2
    "Since this queue can't have a data-race issue" - if that's true, then that queue is doing thread-safe synchronization itself. You can't escape it. And you need synchronization **exactly on** a multi-core system. On a single-core system you don't, because there is no race conditions by design. So your first paragraph makes no sense. – Blindy Jun 13 '22 at 19:00
  • It really depends on MPMC_Circular_Queue. You need to use locks in that Queue or around that unless you have an atomic enque and dequeue – kmodexc Jun 13 '22 at 19:08
  • This is hard to answer without knowing the implementation of `enqueue()` and `dequeue()`. Also, lock-freeness only guarantees that at least one of the threads involved makes progress. – G. Sliepen Jun 13 '22 at 20:00
  • You claim that `MPMC_Circular_Queue` is lock-free, but it cannot be lock-free unless the `dequeue(msg)` method immediately returns a "failure" value (e.g., `false`) when the queue is empty and the `enqueue(msg)` method immediately returns "failure" when the queue is full. Either the queue is not lock-free, or else your code is _incorrectly_ ignoring the return value from `enqueue(msg)`. – Solomon Slow Jun 13 '22 at 20:03
  • Broadly, condvars are used to allow threads to sleep when they have nothing to do, and get woken up when there is something to do. If you're writing lock-free busy-waiting code instead, you avoid sleeping in the first place. The tradeoff is cores dedicated to waiting for something to happen, which is reasonable in some cases but no way to run a general-purpose system. – Useless Jun 13 '22 at 20:29
  • @SolomonSlow lock-free != wait-free – Taekahn Jun 13 '22 at 20:51
  • 1
    @Taekahn, OP said, "ring-buffer," and that implies that the queue is bounded. If bounded, then `enqueue(msg)` must either be able to fail or block when there is no room for the `msg`. It's my understanding—based on §3.7 of The Art of Multiprocessor Programming—that a blocking operation cannot be called "lock-free." E.g., in this case, if `enqueue(msg)` can block, then there _must_ be some point at which suspending a thread in a `dequeue()` call would block _every_ concurrent call to `enqueue`, and if no `enqueue` caller can make progress, then `enqueue` is not "lock-free." – Solomon Slow Jun 13 '22 at 21:52
  • @SolomonSlow: ring-buffer queues aren't a lock-free algorithm in corner cases (where one thread sleeps at a bad place) even if they let the caller go do something else. They still make no progress on passing messages between threads, i.e. on that algorithm, even if they return a failure status. See [Lock-free Progress Guarantees in a circular buffer queue](https://stackoverflow.com/q/45907210) . But yes it's potentially useful in practice to have try_push and try_pop functions to at least let a thread do something else, if it knows of lower-priority useful work. – Peter Cordes Jun 13 '22 at 22:17
  • But yes, agreed that having a potentially-blocking operation (that needs to wait for another thread to finish something) automatically rules out lock-freedom. And that a fixed-size circular queue is almost certainly not going to be lock-free or even obstruction-free, since it's not worth all the code to have one thread be able to cancel another thread's partially-complete write or read. Possible in theory but nobody does that, just make the buffer big enough that it doesn't block often in practice. – Peter Cordes Jun 13 '22 at 22:22
  • 1
    Anyway, *starvation* is a separate issue from lock-freedom. If you have a case where a new message gets added to the queue right before an already-running thread finishes with a previous message and check for a new one, then yeah it might tend to win the race and keep running, instead of blocking and having a thread wake up that had been waiting longer. If this is passing out work to worker threads, you'd probably *rather* have threads stay asleep, and already running threads stay running on an already-awake CPU core; their stack memory's hot in cache, and no OS's scheduling work. – Peter Cordes Jun 13 '22 at 22:43

0 Answers0