0

I need to create an infinite loop, and in this loop there must be some function that must run in parallel. Since they access to a read-only structure, there's no risk of some race condition, so I want to run them simultaneously in order to gain some performance.

The problem is that I don't know how to achieve this result in an efficient way.

This is an example where I run four function in parallel in the loop with specific framerate (the idea from loop at specific framerate is taken from here):

#include <iostream>
#include <thread>
#include <random>
#include <condition_variable>
#include <mutex>

int getRandomIntBetween(int minValue, int maxValue) {
  std::random_device rd;
  std::mt19937 rng(rd());
  std::uniform_int_distribution<int> uni(minValue, maxValue);
  return uni(rng);
}

void fun1() {
  int randomInterval = getRandomIntBetween(10, 90);
  std::this_thread::sleep_for(std::chrono::milliseconds(randomInterval));
  std::cout << "fun1 done in " << randomInterval << "ms" << std::endl;
}

void fun2() {
  int randomInterval = getRandomIntBetween(10, 90);
  std::this_thread::sleep_for(std::chrono::milliseconds(randomInterval));
  std::cout << "fun2 done in " << randomInterval << "ms" << std::endl;
}

void fun3() {
  int randomInterval = getRandomIntBetween(10, 200);
  std::this_thread::sleep_for(std::chrono::milliseconds(randomInterval));
  std::cout << "fun3 done in " << randomInterval << "ms" << std::endl;
}

void fun4() {
  int randomInterval = getRandomIntBetween(3, 300);
  std::this_thread::sleep_for(std::chrono::milliseconds(randomInterval));
  std::cout << "fun4 done in " << randomInterval << "ms" << std::endl;

}


int main(int argc, char* argv[]) {
  const int64_t frameDurationInUs = 1.0e6 / 1;
  std::cout << "Parallel looping testing" << std::endl;
  std::condition_variable cv;
  std::mutex  mut;
  bool stop = false;
  size_t counter{ 0 };
  using delta = std::chrono::duration<int64_t, std::ratio<1, 1000000>>;
  auto next = std::chrono::steady_clock::now() + delta{ frameDurationInUs };
  std::unique_lock<std::mutex> lk(mut);
  while (!stop) {
    mut.unlock();

    if (counter % 10 == 0) {
      std::cout << counter << " frames..." << std::endl;
    }
    std::thread t1{ &fun1 };
    std::thread t2{ &fun2 };
    std::thread t3{ &fun3 };
    std::thread t4{ &fun4 };

    counter++;
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    mut.lock();
    cv.wait_until(lk, next);
    next += delta{ frameDurationInUs };
  }

  return 0;
}

It works but it's inefficient, because I create and delete four thread objects at every iteration.

Instead I'd like to maintain the threads always active, and then call the functions inside the loop, and using some lock mechanism (mutex, semaphore) to wait inside the loop that all functions are run completely before start the next loop iteration.

How can achieve this result?

Jepessen
  • 11,744
  • 14
  • 82
  • 149
  • _" it's inefficient"_ Have you proved that? AFAIK, implementations may reuse threads. You can use an explicit thread pool, but you may end up with the same or even worse efficiency. – Daniel Langr Dec 20 '22 at 10:44
  • Does this answer your question? [Thread pooling in C++11](https://stackoverflow.com/questions/15752659/thread-pooling-in-c11) – dewaffled Dec 20 '22 at 10:47
  • Live demo of thread reusing: https://godbolt.org/z/14M6cb4c5. – Daniel Langr Dec 20 '22 at 10:52
  • I don't know if you can use C++20 features, where atomic variables are waitable, but if not, you could have each thread wait using a condition variable which the main loop triggers and notifies on (e.g., a sequential counter). Threads can drop the lock while doing work. – Hasturkun Dec 20 '22 at 11:36

1 Answers1

2

If you do not want to rely on thread reusing, you don't have to resort to pooling:


In your very specific case you probably don't need to bother with a fully developed thread pool as you want each function to be run exactly once by the corresponding thread.

Your joins therefore become queries for the threads to be done with one particular job:

std::array<std::atomic<bool>, 4> done;
// loop:
std::fill(begin(done), end(done), false);
// ... run threads
for (std::size_t i = 0; i < 4; ++i) {
  while (done[i] == false) {} // wait for thread i to finish
}

And thread i obviously then writes done[i] = true; once the function it was supposed to run is done.

You would distribute work packages in much the same way.

bitmask
  • 32,434
  • 14
  • 99
  • 159