2

I would like to reuse a vector of threads that call the same function several times with different parameters. There is no writing (with the exception of an atomic parameter), so no need for a mutex. To depict the idea, I created a basic example of a parallelized code that finds the maximum value of a vector. There are clearly better ways to find the max of a vector, but for the sake of the explanation and to avoid getting into further details of the real code I am writing, I am going with this silly example.

The code finds the maximum number of a vector by calling a function pFind that checks whether the vector contains the number k (k is initialized with an upper bound). If it does, the execution stops, otherwise k is reduced by one and the process repeats.

The code bellow generates a vector of threads that parallelize the search for k in the vector. The issue is that, for every value of k, the vector of threads is regenerated and each time the new threads are joined. Generating the vector of threads and joining them every time comes with an overhead that I want to avoid.

I am wondering if there is a way of generating a vector (a pool) of threads only once and reuse them for the new executions. Any other speedup tip will be appreciated.

void pFind(
    vector<int>& a,
    int n,
    std::atomic<bool>& flag,
    int k,
    int numTh,
    int val
    ) {
    int i = k;

    while (i < n) {
        if (a[i] == val) {
            flag = true;
            break;
        } else 
            i += numTh;
    }
}

int main() {   
    std::atomic<bool> flag;
    flag = false;
    int numTh = 8;
    int val = 1000;
    int pos = 0;

    while (!flag) {
        vector<thread>threads;
        for (int i = 0; i < numTh; i++){ 
            thread th(&pFind, std::ref(a), size, std::ref(flag), i, numTh, val);
            threads.push_back(std::move(th));
        }
        for (thread& th : threads) 
            th.join();

        if (flag) 
           break;

        val--;

   }
   cout << val << "\n";
   return 0;
}
  • 1
    Usually, you'd write task processors that execute a given task with arguments drawn from a queue; the main thread populate a shared queue of arguments, from which the workers pull new work as needed. You need mutexes around access to the queue, but as long as you make it cheap to insert and extract the work items (using `std::move` of `.front()` to avoid copy construction followed by `.pop()` to remove the item, with all other work outside the mutex's control, or if `std::move` is still to expensive, storing and retrieving `unique_ptr` or the like), the lock contention should be minimal. – ShadowRanger Nov 08 '17 at 03:35
  • 2
    Basically, don't "reuse" the thread, have the thread know how to keep doing work instead of exiting. – ShadowRanger Nov 08 '17 at 03:36
  • Can you provide a simple example (or point me to one). Thanks in advance – totalUnimodular Nov 08 '17 at 03:39
  • [Thread pooling in C++11](https://stackoverflow.com/q/15752659/364696) has some example code; this isn't directly supported in the standard libraries, so you have to use third party libs, or roll your own code. – ShadowRanger Nov 08 '17 at 03:44
  • None of the examples there address your suggestion. There are some options, but nothing concrete – totalUnimodular Nov 08 '17 at 04:07
  • Because it's going to differ based on use case. Thread pooling is non-trivial in C++; [this answer is a starting point](https://stackoverflow.com/a/32593825/364696), but you need to write your own code. It's not hard to find more examples if you just search; the "Related" sidebar for this question provided that link, and basic searching can find a bazillion more examples. – ShadowRanger Nov 08 '17 at 04:19

3 Answers3

1

There is no way to assign a different execution function (closure) to a std::thread after construction. This is generally true of all thread abstractions, though often implementations try to memoize or cache lower-level abstractions internally to make thread fork and join fast so just constructing new threads is viable. There is a debate in systems programming circles about whether creating a new thread should be incredibly lightweight or whether clients should be written to not fork threads as frequently. (Given this has been ongoing for a very long time, it should be clear there are a lot of tradeoffs involved.)

There are a lot of other abstractions which try to do what you really want. They have names such as "threadpools," "task executors" (or just "executors"), and "futures." All of them tend to map onto threads by creating some set of threads, often related to the number of hardware cores in the system, and then having each of those threads loop and look for requests.

As the comments indicated, the main way you would do this yourself is to have threads with a top-level loop that accepts execution requests, processes them, and then posts the results. To do this you will need to use other synchronization methods such as mutexes and condition variables. It is generally faster to do things this way if there are a lot of requests and requests are not incredibly large.

As much as standard C++ concurrency support is a good thing, it is also rather significantly lacking for real world high performance work. Something like Intel's TBB is far more of an industrial strength solution.

Zalman Stern
  • 3,161
  • 12
  • 18
1

By piecing together some code from different online searches, the following works, but is not as fast as as the approach that regenerates the threads at each iteration of the while loop.

Perhaps someone can comment on this approach.

The following class describes the thread pool

class ThreadPool {
    public:
    ThreadPool(int threads) : shutdown_(false){
        threads_.reserve(threads);
        for (int i = 0; i < threads; ++i)
           threads_.emplace_back(std::bind(&ThreadPool::threadEntry, this, i));
    }

    ~ThreadPool(){
        {
            // Unblock any threads and tell them to stop
            std::unique_lock<std::mutex>l(lock_);

            shutdown_ = true;
            condVar_.notify_all();
        }

        // Wait for all threads to stop
        std::cerr << "Joining threads" << std::endl;

        for (auto & thread : threads_) thread.join();
    }

    void doJob(std::function<void(void)>func){
        // Place a job on the queu and unblock a thread
        std::unique_lock<std::mutex>l(lock_);

        jobs_.emplace(std::move(func));
        condVar_.notify_one();
    }

    void threadEntry(int i){
        std::function<void(void)>job;

        while (1){
            {
                std::unique_lock<std::mutex>l(lock_);

                while (!shutdown_ && jobs_.empty()) condVar_.wait(l);

                if (jobs_.empty()){
                    // No jobs to do and we are shutting down
                    std::cerr << "Thread " << i << " terminates" << std::endl;
                    return;
                }

                std::cerr << "Thread " << i << " does a job" << std::endl;
                job = std::move(jobs_.front());
                jobs_.pop();
            }

            // Do the job without holding any locks
            job();
        }
   }
};

Here is the rest of the code

void pFind(
vector<int>& a,
int n,
std::atomic<bool>& flag,
int k,
int numTh,
int val,
std::atomic<int>& completed) {
    int i = k;

    while (i < n) {
        if (a[i] == val) {
            flag = true;
            break;
        } else 
            i += numTh;
    }
    completed++;
}

int main() {   
    std::atomic<bool> flag;
    flag = false;
    int numTh = 8;
    int val = 1000;
    int pos = 0;
    std::atomic<int> completed;
    completed=0;

    ThreadPool p(numThreads);

    while (!flag) {
        for (int i = 0; i < numThreads; i++) {
            p.doJob(std::bind(pFind, std::ref(a), size, std::ref(flag), i, numTh, val, std::ref(completed)));
        }

        while (completed < numTh) {}

        if (flag) {
            break;
        } else {
            completed = 0;
            val--;
        }

   }
   cout << val << "\n";
   return 0;
}
0

Your code has a race condition: bool is not an atomic type and is therefore not safe for multiple threads to write to concurrently. You need to use std::atomic_bool or std::atomic_flag.

To answer your question, you're recreating the threads vector each iteration of the loop, which you can avoid by moving its declaration outside the loop body. Reusing the threads themselves is a much more complex topic that's hard to get right or describe concisely.

vector<thread> threads;
threads.reserve(numTh);

while (!flag) {
    for (size_t i = 0; i < numTh; ++i)
        threads.emplace_back(pFind, a, size, flag, i, numTh, val);
    for (auto &th : threads)
        th.join();
    threads.clear();
}
Ray Hamel
  • 1,289
  • 6
  • 16