1

My program has three threads, and I am trying to learn about synchronization and thread safety. Below I outline what the different threads do, but I would like to learn how to use events instead to trigger each process in the different threads instead of infinitely reading (which is giving me concurrency issues).

Googling throws up many options but I'm not sure what is best to implement in this case - could you point the direction to a standard method/event that I could learn to best implement this?

I am doing this on VS 2012, and ideally I would not use external libraries e.g. boost.

Thread 1: receives a message and pushes it into a global queue, queue<my_class> msg_in.

Thread 2: on infinite loop (i.e. while(1) ); waits till if (!msg_in.empty()), does some processing, and pushes it into a global map<map<queue<my_class>>> msg_out.

while (1)
{
    if (!msg_in.empty())
    {
        //processes 
        msg_map[i][j].push(); //i and j are int (irrelevant here)
    }

}

Thread 3:

while (1)
{
    if (msg_map.find(i) != msg_map.end())
    {
        if (!msg_map[i].find(j)->second.empty())
        {
            //processes 
        }
    }
}
Lefteris E
  • 2,806
  • 1
  • 24
  • 23
sccs
  • 1,123
  • 3
  • 14
  • 27

1 Answers1

0

Your problems is a producer consumer problem. You can use condition variables for your events. There is one example of it here: http://en.cppreference.com/w/cpp/thread/condition_variable

I have adapted it to your example if your need it.

#include "MainThread.h"


#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
bool ready = false;
bool processed = false;

void worker_thread(unsigned int threadNum)
{
    // Wait until main() sends data
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return ready;});
    }

    std::cout << "Worker thread "<<threadNum <<" is processing data"<<std::endl;

    // Send data back to main()
    {
        std::lock_guard<std::mutex> lk(m);
        processed = true;
        std::cout << "Worker thread "<< threadNum <<" signals data processing completed\n";
    }
    cv.notify_one();
}


int initializeData()
{
    // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "Data initialized"<<std::endl;
    }
    cv.notify_one();
    return 0;
}

int consumerThread(unsigned int nbThreads)
{
    std::atomic<unsigned int> nbConsumedthreads=0;
    while (nbConsumedthreads<nbThreads)
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
        std::cout<<"Data processed counter="<<nbConsumedthreads << " "<<  std::endl;
        ++nbConsumedthreads;
        cv.notify_one();
    }

    return 0;
}

int main()
{
    const unsigned int nbThreads=3;
    std::thread worker1(worker_thread,1);
    std::thread worker2(worker_thread,2);
    std::thread worker3(worker_thread,3);

    std::thread init(initializeData);

    std::thread consume(consumerThread, nbThreads);



    worker1.join();
    worker2.join();
    worker3.join();

    init.join();

    consume.join();

    return 0;
}

Hope that helps, tell me if you need more info.

Gabriel
  • 3,564
  • 1
  • 27
  • 49