I was trying to implement Upper/Lower parallel prefix for any vector with 2^N elements. Note that the algorithm that I use from some book provided code for such limited vector length. Anyway, I was trying to implement a naive thread pool and this is my first time writing multi-threaded application. The algorithm I use have number of levels of prefix calculation depending on length of vector and each operation on a particular level can be carried out in parallel.
for(int k = 0; k < std::log2(vector.size()); k++) {
for(int r = 0; r < n / 2; r++) {
AddInParallel(vector, r, k);
}
}
Parallel work in here is nothing but passing two parameters (r, k) to threads. So I wrote a ThreadPool class which utilizes a deque. Parameters are pushed to deque from one end and read from threads by other end. Then, AddInParallel is called. However, there seems to be some race condition (not sure if it's appropriate term) that my result is not correct.
I tried guarding the scope that AddInParallel is called but nothing has changed
Here is ThreadPool class
typedef std::lock_guard<std::mutex> Guard;
class ThreadPool {
public:
static ThreadPool &Instance() {
static ThreadPool instance;
return instance;
}
void SupplyTask(const TaskParameter ¶m) {
Guard guard(Mutex());
tasks.emplace_front(param);
}
void Finalize() {
done = true;
for(std::size_t i = 0; i < threads.size(); i++) {
threads[i].join();
}
}
void Synch() {
while(taskcount) {
;
}
}
private:
ThreadPool(): done(false), taskcount(0), threads(CONCURRENCY) {
for(std::size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(&ThreadPool::Do, this, i);
}
}
ThreadPool(const ThreadPool &pool) = delete;
ThreadPool &operator=(const ThreadPool &pool) = delete;
static std::mutex &Mutex() {
static std::mutex mutex;
return mutex;
}
bool PollTask(TaskParameter ¶m) {
Guard guard(Mutex());
if(!tasks.empty()) {
param = tasks.back();
tasks.pop_back();
return true;
}
return false;
}
void Print(const unsigned int id, const unsigned int r, const unsigned int k) {
Guard guard(Mutex());
std::cout << "Thread ID: " << id << std::endl;
std::cout << "r: " << r << std::endl << "k: " << k << std::endl;
std::cout << "------------" << std::endl;
}
void Do(unsigned int id) {
TaskParameter param;
bool havetask;
while(!done) {
bool havetask = PollTask(param);
if(havetask) {
taskcount++;
Print(id, param.r, param.k);
AddInParallel(*param.vector, param.r, param.k);
taskcount--;
}
else {
std::this_thread::yield();
}
}
}
std::atomic_bool done;
std::atomic_uint taskcount;
std::vector<std::thread> threads;
std::deque<TaskParameter> tasks;
static const std::size_t CONCURRENCY;
};
const std::size_t ThreadPool::CONCURRENCY = 7;
Since, each level depends on computation of previous one, I tried to synchronize all threads before going next level like following
for (k = 0; k < logn; k++) {
for (r = 0; r < n / 2; r++) {
ThreadPool::Instance().SupplyTask(TaskParameter(vector, r, k));
}
ThreadPool::Instance().Synch();
}
A sample run length of 2^4:
Input: 1 ... 16
Output: 1, 3, 6, 10, 15, 21, 28, 46, 55, 65, 76, 88, 101, 115, 130, 146
Expected: 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136
I am not sure what's the problem.