0

I want to simulate a scenario multiple times, i.e., repCnt = 100. To speed up the process, I want to use multiple threads, which work in parallel and use a mutex when they have to log the results into a file.

They need to deduct from the total amount of repetitions by working in groups of NUM_THREADS = 4. The logging part with mutex is easy, but I can not figure out how the main loop should look like.

Here is a start:

NUM_THREADS = 4

void simulate(struct argType arg) {

}


int main() {
    // ... Some code here ... 

    vector<thread> vecOfThreads;

    for (rep = 0; rep < repCnt; rep++) {

        // Here they should work in groups of 4, i.e., rep = 1, 2, 3, 4

        // They need to call the simulate(struct argType) function while they are working

        // Once a thread is done, it should get the next task from the loop, i.e., rep = 5

    } 

    return 0;
}

When I searched for threadpooling in C++, all I found was with too many classes and methods. There should be a much faster way of doing what I want to achieve. Can someone help me out with the simplest and shortest C++ code?

OnurA
  • 611
  • 2
  • 11
  • 25
  • What is in `arg`? Is it the same for every call? – Botje Apr 21 '20 at 07:05
  • All quick simple solutions are going to involve creating and destroying threads on each loop iteration. The standard doesn't support thread pools very well, and won't until the Networking TS or something similar gets merged. Can you use a library like Boost.Asio? Or a platform specific thread pool? – Zuodian Hu Apr 21 '20 at 07:15
  • Does your compiler support OpenMP? There, you can easily create a parallel region and within define _tasks_ in your loop. Unfortunately, C++ threading does not support such an easy solution. – Daniel Langr Apr 21 '20 at 07:18
  • Does this answer your question? [Thread pooling in C++11](https://stackoverflow.com/questions/15752659/thread-pooling-in-c11) – Daniel Langr Apr 21 '20 at 07:20
  • 1
    There some very simple thread queue code [here](https://stackoverflow.com/a/29742586/721269). You can adapt it as necessary. – David Schwartz Apr 21 '20 at 07:27
  • @DavidSchwartz This was a page I was already aware of. However, it seems like it is indeed not that easy to do it with a few lines. It seems to work more reliably for my code, than the cxxpool solution below. – OnurA Apr 21 '20 at 09:42

3 Answers3

1

Using the header-only cxxpool:

#include "cxxpool.h"
#include <iostream>
#include <mutex>
#include <thread>

std::mutex cout_mutex;

struct some_arg {
    int i;
};

void simulate(const some_arg arg) {
    std::lock_guard<std::mutex> lock(cout_mutex);
    std::cout << "Hello from task #" << arg.i << " and thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    cxxpool::thread_pool pool{4};
    for (int i = 0; i < 100; i++) {
        pool.push(simulate, some_arg{i});
    }
}
Botje
  • 26,269
  • 3
  • 31
  • 41
  • Even though I did not want to use additional headers, classes etc., I tried this. But it crashes on run time. I suspected that it is the racing condition, but it is not. I returned before the critical part of the code, but it crashes still. – OnurA Apr 21 '20 at 08:08
  • This exact code? Where can it possibly produce a race condition? – Botje Apr 21 '20 at 08:11
  • No, I adapted it to my code. The region, where the mutex is necessary is commented out. So right now it is a code, simulating only but not logging anything. Still, it crashes for me – OnurA Apr 21 '20 at 08:13
  • Without seeing your code we can only guess what is wrong. Have you attached a debugger? Is is possible your simulate function is (mis)using shared state somewhere? – Botje Apr 21 '20 at 08:14
  • No, I get a segmentation fault but there is no shared object among threads. I have isolated everything with a mutex right now. In the simulate() code, every object is created new and destructed after the execution. – OnurA Apr 21 '20 at 08:31
  • Well, does it also crash if you just create a bunch of threads that call `simulate` manually? If so I would strongly suggest attaching a debugger and inspecting the crash sites. – Botje Apr 21 '20 at 08:34
  • For test purpose, I modified the line "cxxpool::thread_pool pool{4};" --> "cxxpool::thread_pool pool{1};". It crashes still. – OnurA Apr 21 '20 at 08:36
  • 2
    I can only repeat myself: please attach a debugger and find out where it crashes instead of treating your program as a black box. – Botje Apr 21 '20 at 08:39
  • Yes, the debugger shows that at the moment of failure, the code is executing line 367 in the "cxxpool.h" file, thread.join(). But I do not know more unfortunately. – OnurA Apr 21 '20 at 08:48
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/212137/discussion-between-botje-and-onura). – Botje Apr 21 '20 at 08:58
  • 1
    I reconfigured my debugging environment, which made it work properly. I was able to detect the reason why it crashes. Thanks. – OnurA Apr 21 '20 at 13:38
0

A simple approach would be to use three for-loops in you main thread and a single while loop in your worker threads:

In main thread:

1) Fill a queue with your units of work

2) Create worker threads an store them in a vector

3) Wait for the threads to complete

In worker threads:

In a loop try to dequeue a single work unit at a time from your work queue by using a mutex. If the queue is empty all processing is done.

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>

struct Args {
    int num;

    Args(int num) : num(num) {}
};

void worker(std::queue<Args>& work, std::mutex& workMutex, std::vector<Args>& results, std::mutex& resultMutex) {
    while(true) {
           Args arg(-1);

           // dequeue work unit
           {
               std::lock_guard<std::mutex> lock(workMutex);
               if (work.empty()) break;
               arg = work.front();
               work.pop();
           }

           // process arg
           Args result(arg.num + 1);

           // save your results
           {
                std::lock_guard<std::mutex> lock(resultMutex);
                results.push_back(result);
           }
    }
}

int main() {
    std::vector<std::thread> threads;
    std::queue<Args> work;
    std::mutex workMutex;
    std::vector<Args> results;
    std::mutex resultMutex;

    const int NUMBER_OF_THREADS = 4;
    const int NUMBER_OF_WORK_UNITS = 100;

    // enqueue work units
    for (int i = 0; i < NUMBER_OF_WORK_UNITS; ++i) {
        work.push(Args(i));
    }

    // create worker threads
    for (int i = 0; i < NUMBER_OF_THREADS; ++i) {
        threads.emplace_back(worker, std::ref(work), std::ref(workMutex), std::ref(results), std::ref(resultMutex));
    }

    // wait for threads completing all work units
    for (std::thread& t : threads) {
        t.join();
    }

    // process results
    for (const Args& result : results) {
        std::cout << result.num << "\n";
    }
}
AckderIII
  • 1
  • 1
0

IMHO the simplest solution would be to use OpenMP - no need for external libraries, and it is supported by virtually all compilers.

This page provides a good introduction for OpenMP: http://jakascorner.com/blog/2016/04/omp-introduction.html More details about loop parallelism are provided here: https://pages.tacc.utexas.edu/~eijkhout/pcse/html/omp-loop.html

mpoeter
  • 2,574
  • 1
  • 5
  • 12