0

I am trying to solve the problem with a time derivation in a multithreaded setup. I have 3 threads, all pinned to different cores. The first two threads (reader_threads.cc) run in the infinite while loop inside the run() function. They finish their execution and send the current time window they are into the third thread.

The current time window is calculated based on the value from chrono time / Ti

The third thread is running at its own pace, and it's checking only the request when the flag has been raised, which is also sent via Message to the third thread.

I was able to get the desired behavior of all three threads in the same epoch if one epoch is at least 20000us. In the results, you can find more info.

Reader threads

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>

#include "control_thread.h"

#define INTERNAL_THREAD 

#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#endif

using namespace std;
    
atomic<bool> thread_active[2];
atomic<bool> go;

pthread_barrier_t barrier;

template <typename T> 
void send(Message volatile * m, unsigned int epoch, bool flag) {
  for (int i = 0 ; i < sizeof(T); i++){
    m->epoch = epoch;
    m->flag = flag;
  }
}

ControlThread * ct;

// Main run for threads
void run(unsigned int threadID){

    // Put message into incoming buffer
    Message volatile * m1 = &(ct->incoming_requests[threadID - 1]);

    thread_active[threadID] = true;
    std::atomic<bool> flag;

    // this thread is done initializing stuff 
    thread_active[threadID] = true;

    while (!go);

    while(true){

        using namespace std::chrono;

        // Get current time with precision of microseconds
        auto now = time_point_cast<microseconds>(steady_clock::now());
        // sys_microseconds is type time_point<system_clock, microseconds>
        using sys_microseconds = decltype(now);
        // Convert time_point to signed integral type
        auto duration = now.time_since_epoch();
        // Convert signed integral type to time_point
        sys_microseconds dt{microseconds{duration}};

        // test
        if (dt != now){
          std::cout << "Failure." << std::endl;
        }else{
          // std::cout << "Success." << std::endl;
        }
        
        auto epoch = duration / Ti;

        pthread_barrier_wait(&barrier);

        flag = true;
        // send current time to the control thread
        send<int>(m1, epoch, flag);

        auto current_position = duration % Ti;

        std::chrono::duration<double, micro> multi_thread_sleep = chrono::microseconds(Ti) - chrono::microseconds(current_position);

        if(multi_thread_sleep > chrono::microseconds::zero()){
          this_thread::sleep_for(multi_thread_sleep);
        }
    }
}

int threads_num = 3; 

void server() {    

      // Don't start control thread until reader threds finish init
      for (int i=1; i < threads_num; i++){
        while (!thread_active[i]);
      }

      go = true;
    
      while (go) {
        for (int i = 0; i < threads_num; i++) {
          ct->current_requests(i);
        }

        // Arbitrary sleep to ensure that locking is accurate
        std::this_thread::sleep_for(50us);

      }
    }
    
    class Thread {
    
    public:
    #if defined INTERNAL_THREAD
      thread execution_handle;
    #endif
      unsigned int id; 
      Thread(unsigned int i) : id(i) {}
    };
    
    
    void init(){
      ct = new ControlThread();
    }
    
    int main (int argc, char * argv[]){
    
      Thread * r[4];
      pthread_barrier_init(&barrier, NULL, 2);

      init();
    
      /* start threads
       *================*/
    
      for (unsigned int i = 0; i < threads_num; i++) {
        r[i] = new Thread(i);
      
    #if defined INTERNAL_THREAD
        if(i==0){
          r[0]->execution_handle = std::thread([] {server();});
        }else if(i == 1){
          r[i]->execution_handle = std::thread([i] {run(i);});
        }else if(i == 2){
          r[i]->execution_handle = std::thread([i] {run(i);});
        }
    
        /* pin to core i */
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(i, &cpuset);
        int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
    
    
    #endif
      }

    // wait for threads to end
    
    for (unsigned int i = 0; i < threads_num + 1; i++) {
      #if defined INTERNAL_THREAD    
        r[i]->execution_handle.join();
      #endif
      }
      pthread_barrier_destroy(&barrier);
      return 0;
    }

Control Thread

#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__

// Global vars
const auto Ti = std::chrono::microseconds(15000);
std::mutex m;
int count;

class Message{
  public:
  std::atomic<bool> flag;
  unsigned long epoch;
};


class ControlThread {
public:

  /* rw individual threads */
  Message volatile incoming_requests[4];

  void current_requests(unsigned long current_thread) {
    
    using namespace std::chrono;

    auto now = time_point_cast<microseconds>(steady_clock::now());
    // sys_milliseconds is type time_point<system_clock, milliseconds>
    using sys_microseconds = decltype(now);
    // Convert time_point to signed integral type
    auto time = now.time_since_epoch();
    // Convert signed integral type to time_point
    sys_microseconds dt{microseconds{time}};
    
    // test
    if (dt != now){
      std::cout << "Failure." << std::endl;
    }else{
      // std::cout << "Success." << std::endl;
    }

    long contol_thread_epoch = time / Ti;

      // Only check request when flag is raised
      if(incoming_requests[current_thread].flag){

        m.lock();

        incoming_requests[current_thread].flag = false;

        m.unlock();

        // If reader thread epoch and control thread matches
        if(incoming_requests[current_thread].epoch == contol_thread_epoch){

        // printf("Successful desired behaviour\n");

        }else{
          count++;
          if(count > 0){
            printf("Missed %d\n", count);
          }
        }
      }
    }
  };
#endif 

RUN

g++ -std=c++2a -pthread -lrt -lm -lcrypt reader_threads.cc -o run
sudo ./run 

Results

The following missed epochs are with one loop iteration (single Ti) equal to 1000us. Also, by increasing Ti, the less number of epochs have been skipped. Finally, if Ti is set to the 20000 us , no skipped epochs are detected. Does anyone have an idea whether I am making a mistake in casting or in communication between threads? Why the threads are not in sync if epoch is i.e. 5000us?

Missed 1
Missed 2
Missed 3
Missed 4
Missed 5
Missed 6
Missed 7
Missed 8
Missed 9
Missed 10
Missed 11
Missed 12
Missed 13
Missed 14
Missed 15
Missed 16 
  • Try to minimize the code and remove the piece not related to problem – Dmytro Ovdiienko Mar 18 '22 at 21:56
  • I reduce the code to the smallest possible minimal reproducible test case. Anything less would make the problem not reproducible – eleftheria15 Mar 18 '22 at 22:05
  • Not sure if related, but you appear to have a data race in `ControlThread::current_requests()`: you only hold the mutex when writing `incoming_requests`, not while reading it. So it is possible that one thread is writing (while holding the mutex) while another is reading (without the mutex). You should hold the mutex during any access, either read or write. (Multiple concurrent reads would be okay, as long as nobody is writing; you could use a `shared_mutex` to get that.) – Nate Eldredge Mar 18 '22 at 22:29
  • Unrelated notes: `__CONTROL_THEAD_H__` [is an illegal identifier](https://stackoverflow.com/questions/228783/what-are-the-rules-about-using-an-underscore-in-a-c-identifier). Defining variables in a header is not recommended as every includer of the header will get copies of the variable. The linker won't even try to sort out that they're all the same thing; it'll just report an error and exit. – user4581301 Mar 18 '22 at 22:29
  • Note that having `incoming_requests` being `volatile` doesn't avoid the data race. And once the locking is correct you will not need `volatile` at all. Generally, any appearance of `volatile` in conjunction with multithreading is usually a bug. These days, about the only legitimate use of `volatile` is for memory-mapped hardware. – Nate Eldredge Mar 18 '22 at 22:31
  • `chrono` can handle microsecond and millisecond resolution, but can your target? Many can't. – user4581301 Mar 18 '22 at 22:31
  • Some of the comments don't match the code. For example `duration` is not a signed integral type. Here's a function you can use to print out the `decltype(duration)`: https://stackoverflow.com/a/58331141/576911 – Howard Hinnant Mar 18 '22 at 23:00

0 Answers0