0

I am trying to achieve a simple task on my multithreaded application but I can't figure out how to do it.

I have 3 threads:

  • 1 thread acting as master thread
  • 2 threads acting as slave threads

1) The 2 slave threads just need to acquire some data, sleep some milliseconds and process again.

2) The master thread waits that slave threads finishes their job, processes the data acquired by the slave threads and so on

How to be sure that both slave thread did their acquisition (and not thread slave n°1 did twice the job before thread slave n°2 could do it) n order for master thread to processed all data ?

  • Using semaphores, I can't control if both threads did their job, I can just notice that data have been acquired twice.

  • Using condition variables, I don't know how to individually inform the master thread that BOTH slave threads did their job

I hope it was clear enough for you to help me.

EDIT1: One of my goals is to NOT block the slave threads to have a real multithreaded application. I would like, in the best scenario, that master thread processes data while slave acquire the next data.

I am using Qt 4.8 on Linux

Thank you for your help

Robert Jones
  • 587
  • 7
  • 25
  • How you post a data to slave threads? – MaxV Feb 28 '17 at 09:54
  • @MaxV Slave threads are just reading data from a file. File access is protected with mutexes – Robert Jones Feb 28 '17 at 09:55
  • They read a data from one file or from two files? – MaxV Feb 28 '17 at 09:56
  • @MaxV They read data from two files. Depending on the data read they do some minor processing then write these data inside a buffer. Master thread uses this buffer to some more complex processing. It is absolutely mandatory that one sklave thread has done its job once before the master processed the data – Robert Jones Feb 28 '17 at 09:59
  • 1
    Does it matter the order in which master thread processes the data? can it process data from thread 2 first and thread 1 later or it doesn't matter? – Samer Tufail Feb 28 '17 at 10:19
  • How about to use an atomic variable where bit 0 marks whether slave thread 1 has processed data and bit 1 for slave thread 2. The master can reset it to 0 if data of both slave threads have been "consumed". (May be additionally to your idea with condition to prevent polling in slaves as long as master has not yet processed the data...) – Scheff's Cat Feb 28 '17 at 12:17
  • @SamerTufail The order does not matter. However it matters that thread 1 and thread 2 have read the data before master thread processes it. It does not matter if it does THREAD1 - THREAD2 - THREADMASTER or THREAD2 - THREAD1 - THREADMASTER – Robert Jones Feb 28 '17 at 12:28
  • @Scheff This could be a good idea indeed, thank you. This is almost like using one semaphore for each thread right ? If not what would be the main difference ? – Robert Jones Feb 28 '17 at 12:31
  • sempahores should work, set the initial count to 2 - master doesnt process the data until the count goes to 0 and the slave threads dont carry on unless the count is > 0 – Samer Tufail Feb 28 '17 at 13:40
  • 1
    @RobertJones Atomic variables simply grant that concurrent write access is prevented. If I'm right it grants also that every thread gets the current value (instead of working with a cached value which is actually not anymore valid.) Additionally, I read a statement of somebody (probably in SO) which compare mutex vs. atomic and found out that atomics were much faster for his purpose. However, atomics can be used for safe inter-thread communication but not for signaling. Atomics are usually limited to something with machine word size. Considering that a pointer usually fits into a machine word – Scheff's Cat Feb 28 '17 at 13:51
  • @RobertJones ...they can be used to pass memory allocated and filled by one thread and consumed and freed by another. – Scheff's Cat Feb 28 '17 at 13:52
  • @SamerTufail Your suggestion is good but unfortunately it does not respect the fact that both slave threads have read the data. In this case, one thread could have loop twice (without letting the other thread to do its job) and still increment the semaphore. Therefore, we won't be sure that the work from the slave threads was correctly done – Robert Jones Feb 28 '17 at 14:32
  • @Scheff I tried with two semaphores but the problem is that sometime the master thread don't have time to process the data that the other two slaves have started again the cycle of reading data and releasing semaphore. I might be doing something wrong here... Once slave thread read the data, it releases semaphore. Before processing data, master thread try to acquire both thread semaphore and so on... Is that correct ? – Robert Jones Feb 28 '17 at 15:22
  • @RobertJones I came to a similar conclusion and therefore deleted the comment again. Meanwhile I looked into our (companies) application where we had similar problem. Originally, we solved it using condition variables ([SO: why do I need std::condition_variable?](http://stackoverflow.com/questions/16350473/why-do-i-need-stdcondition-variable)) but meanwhile they have been removed. In our case, every thread must stay responsive. Thus, we simply used polling with `sleep()` and `mutex`es to save "transfer buffer" accesses. – Scheff's Cat Feb 28 '17 at 15:40
  • @RobertJones A sleep time of 10000 or 20000 microseconds is quite long for the CPU and quite short for a human user. (Human perception is limited to round about 100 ms.) Thus, CPU load will be quite low but the system appears "immediately" responsive for the user. – Scheff's Cat Feb 28 '17 at 15:43
  • @Scheff So you mean a simple sleep of 10000microsecs at the end of each data read by the slave thread ? Since I have some real time issues (the data I'm reading is processed in real time through the master thread) I think I'll lose some efficiency with this method, don't you think ? – Robert Jones Feb 28 '17 at 15:55
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/136892/discussion-between-scheff-and-robert-jones). – Scheff's Cat Feb 28 '17 at 17:04

1 Answers1

0

This is not quite the requested answer according to question and tags. As we discussed it in comments, I would like to show how this could look like using atomic and sleep():

#include <atomic>
#include <chrono>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>

using namespace std;

typedef atomic<const char*> TransferBuf;

enum { NLoops = 10 };
enum { dtSusp = 20000 };

atomic<bool> stop(false);

// fake acquisition of data
void acquire(int id, TransferBuf &pTransferBuf)
{
  string buffers[2]; size_t iBuf = 0;
  for (size_t step = 0;; ++step) {
    // acquire data
    ostringstream out;
    out << "thread " << id << ", " << step << ": ";
    for (size_t i = 0; i < 10; ++i) {
      if (stop) return;
      out << (i ? ", " : "") << "data " << step << i;
    }
    buffers[iBuf] = out.str();
    // polling: wait for transfer buffer free
    const char *pBuf = buffers[iBuf].c_str(), *pSent = 0;
    while (!pTransferBuf.compare_exchange_strong(pSent, pBuf)) {
      if (stop) return;
      // if storage fails sleep() (to give other threads a chance)
      this_thread::sleep_for(chrono::microseconds(dtSusp));
      // reset pSent (which is updated)
      pSent = 0;
    }
    // buffer sent -> flip buffers for next acquisition
    iBuf ^= 1;
    // sleep some time (on requirement)
    this_thread::sleep_for(chrono::microseconds(dtSusp));
  }
}

int main(int, char**)
{
  TransferBuf pBuf1(0), pBuf2(0);
  // start thread 1 for data aquisition
  thread thread1(&acquire, 1, ref(pBuf1));
  // start thread 2 for data aquisition
  thread thread2(&acquire, 2, ref(pBuf2));
  // start processing
  for (size_t i = 0; i < NLoops; ++i) {
    // polling: wait for data
    while (!pBuf1 || !pBuf2) {
      // if storage fails sleep() (to give other threads a chance)
      this_thread::sleep_for(chrono::microseconds(dtSusp));
    }
    // process data
    const char *pData1 = pBuf1, *pData2 = pBuf2;
    cout << pData1 << endl;
    cout << pData2 << endl;
    // release processed data
    pBuf1 = pBuf2 = 0;
  }
  // finish
  stop = true;
  thread1.join(); thread2.join();
  // done
  return 0;
}

Sorry, that it does not contain any Qt symbol but everything is available in std library since C++11.

Compiled and tested with gcc on cygwin:

$ g++ -std=c++11 -o test-master-slave test-master-slave.cc 

$ ./test-master-slave.exe 
thread 1, 0: data 00, data 01, data 02, data 03, data 04, data 05, data 06, data 07, data 08, data 09
thread 2, 0: data 00, data 01, data 02, data 03, data 04, data 05, data 06, data 07, data 08, data 09
thread 1, 1: data 10, data 11, data 12, data 13, data 14, data 15, data 16, data 17, data 18, data 19
thread 2, 1: data 10, data 11, data 12, data 13, data 14, data 15, data 16, data 17, data 18, data 19
thread 1, 2: data 20, data 21, data 22, data 23, data 24, data 25, data 26, data 27, data 28, data 29
thread 2, 2: data 20, data 21, data 22, data 23, data 24, data 25, data 26, data 27, data 28, data 29
thread 1, 3: data 30, data 31, data 32, data 33, data 34, data 35, data 36, data 37, data 38, data 39
thread 2, 3: data 30, data 31, data 32, data 33, data 34, data 35, data 36, data 37, data 38, data 39
thread 1, 4: data 40, data 41, data 42, data 43, data 44, data 45, data 46, data 47, data 48, data 49
thread 2, 4: data 40, data 41, data 42, data 43, data 44, data 45, data 46, data 47, data 48, data 49
thread 1, 5: data 50, data 51, data 52, data 53, data 54, data 55, data 56, data 57, data 58, data 59
thread 2, 5: data 50, data 51, data 52, data 53, data 54, data 55, data 56, data 57, data 58, data 59
thread 1, 6: data 60, data 61, data 62, data 63, data 64, data 65, data 66, data 67, data 68, data 69
thread 2, 6: data 60, data 61, data 62, data 63, data 64, data 65, data 66, data 67, data 68, data 69
thread 1, 7: data 70, data 71, data 72, data 73, data 74, data 75, data 76, data 77, data 78, data 79
thread 2, 7: data 70, data 71, data 72, data 73, data 74, data 75, data 76, data 77, data 78, data 79
thread 1, 8: data 80, data 81, data 82, data 83, data 84, data 85, data 86, data 87, data 88, data 89
thread 2, 8: data 80, data 81, data 82, data 83, data 84, data 85, data 86, data 87, data 88, data 89
thread 1, 9: data 90, data 91, data 92, data 93, data 94, data 95, data 96, data 97, data 98, data 99
thread 2, 9: data 90, data 91, data 92, data 93, data 94, data 95, data 96, data 97, data 98, data 99

If the processing in main thread would happen in a QTimer signal handler it should look similar like this:

void timeout()
{
    // polling: wait for data
    if (!pBuf1 || !pBuf2) return; // try again next timeout
    // process data
    const char *pData1 = pBuf1, *pData2 = pBuf2;
    cout << pData1 << endl;
    cout << pData2 << endl;
    // release processed data
    pBuf1 = pBuf2 = 0;
}

Btw. I found this article which provides a nice overview regarding practical usage of atomic: Atomic pointers in c++ and passing objects between threads

Community
  • 1
  • 1
Scheff's Cat
  • 19,528
  • 6
  • 28
  • 56