1

I was trying to implement Upper/Lower parallel prefix for any vector with 2^N elements. Note that the algorithm that I use from some book provided code for such limited vector length. Anyway, I was trying to implement a naive thread pool and this is my first time writing multi-threaded application. The algorithm I use have number of levels of prefix calculation depending on length of vector and each operation on a particular level can be carried out in parallel.

for(int k = 0; k < std::log2(vector.size()); k++) {
    for(int r = 0; r < n / 2; r++) {
        AddInParallel(vector, r, k);
    }
}

Parallel work in here is nothing but passing two parameters (r, k) to threads. So I wrote a ThreadPool class which utilizes a deque. Parameters are pushed to deque from one end and read from threads by other end. Then, AddInParallel is called. However, there seems to be some race condition (not sure if it's appropriate term) that my result is not correct.

I tried guarding the scope that AddInParallel is called but nothing has changed

Here is ThreadPool class

typedef std::lock_guard<std::mutex> Guard;
class ThreadPool {
public:
static ThreadPool &Instance() {
    static ThreadPool instance;
    return instance;
}

void SupplyTask(const TaskParameter &param) {
    Guard guard(Mutex());
    tasks.emplace_front(param);
}

void Finalize() {
    done = true;
    for(std::size_t i = 0; i < threads.size(); i++) {
        threads[i].join();
    }
}

void Synch() {
    while(taskcount) {
        ;
    }
}

private:
ThreadPool(): done(false), taskcount(0), threads(CONCURRENCY) {
    for(std::size_t i = 0; i < threads.size(); i++) {
        threads[i] = std::thread(&ThreadPool::Do, this, i);
    }
}

ThreadPool(const ThreadPool &pool) = delete;
ThreadPool &operator=(const ThreadPool &pool) = delete;

static std::mutex &Mutex() {
    static std::mutex mutex;
    return mutex;
}

bool PollTask(TaskParameter &param) {
    Guard guard(Mutex());
    if(!tasks.empty()) {
        param = tasks.back();
        tasks.pop_back();
        return true;
    }
    return false;
}

void Print(const unsigned int id, const unsigned int r, const unsigned int k) {
    Guard guard(Mutex());
    std::cout << "Thread ID: " << id << std::endl;
    std::cout << "r: " << r << std::endl << "k: " << k << std::endl;
    std::cout << "------------" << std::endl;
}

void Do(unsigned int id) {
    TaskParameter param;
    bool havetask;
    while(!done) {
        bool havetask = PollTask(param);
        if(havetask) {
            taskcount++;
            Print(id, param.r, param.k);
            AddInParallel(*param.vector, param.r, param.k);
            taskcount--;
        }
        else {
            std::this_thread::yield();
        }
    }            
}

std::atomic_bool done;
std::atomic_uint taskcount;
std::vector<std::thread> threads;
std::deque<TaskParameter> tasks;

static const std::size_t CONCURRENCY;
};
const std::size_t ThreadPool::CONCURRENCY = 7;

Since, each level depends on computation of previous one, I tried to synchronize all threads before going next level like following

for (k = 0; k < logn; k++) {
    for (r = 0; r < n / 2; r++) {
        ThreadPool::Instance().SupplyTask(TaskParameter(vector, r, k));
    }
    ThreadPool::Instance().Synch();
}

A sample run length of 2^4:

Input: 1 ... 16

Output: 1, 3, 6, 10, 15, 21, 28, 46, 55, 65, 76, 88, 101, 115, 130, 146

Expected: 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136

I am not sure what's the problem.

Cengiz Kandemir
  • 375
  • 1
  • 4
  • 16
  • Not a direct answer to your concern, though, you can use [tbb::parallel_scan](http://www.threadingbuildingblocks.org/docs/help/reference/algorithms/parallel_scan_func.htm) for parallel prefix computation. It does not need to sync the data with locks – Anton May 23 '14 at 18:58
  • It seems the second loop should have `n/(2^k)` iterations instead of `n/2`. Also consider a similar question (answer): [http://stackoverflow.com/questions/10053629/parallel-prefix-sum-fastest-implementation/12874227#12874227](http://stackoverflow.com/questions/10053629/parallel-prefix-sum-fastest-implementation/12874227#12874227) – Alex May 24 '14 at 07:44
  • I've already tested the code with continuous thread construction/destruction, `threads.emplace_back(std::thread(AddInParallel, vector, r, k))` The algorithm is direct copy/paste from the book and was working. I ended up constructing and destructing 300k threads. That's why I wanted to create a pool – Cengiz Kandemir May 24 '14 at 09:59
  • The `Synch` method is not very safe: it relies on `taskcount` but `taskcount` is incremented in `do`. E.g. if the main thread spawns all task via `SupplyTask` before at least one thread from `ThreadPool` get a task (and increment `taskcount`) then `Synch` will wait for nobody and the main thread will start new phase. Try to move `taskcount++` from `do` to `SupplyTask`. – Alex May 24 '14 at 19:53
  • @Alex Your suggestions worked well. Please write it as an answer so that I can accept. – Cengiz Kandemir May 27 '14 at 12:22

1 Answers1

2

The Synch method is not very safe: it relies on taskcount but taskcount is incremented in Do. E.g. if the main thread spawns all task via SupplyTask before at least one thread from ThreadPool get a task (and increment taskcount) then Synch will wait for nobody and the main thread will start new phase. Try to move taskcount++ from Do to SupplyTask.

Alex
  • 612
  • 3
  • 9