I have a task to compute Pi with following formula:
(i is in range from 0 to N, N = 10^8)
Computation should be completed in multiple threads with following requirement: each thread receives only a small fixed amount of computations to complete (in my case - 40 sum members at a time), and there should be a "Task pool" which gives new set of computations into a thread when it reports completion of previous set of operations given to it. Before a thread receives new task, it should wait. All of this should be done with WinAPI.
My solution is this class:
#include "ThreadManager.h"
#include <string>
HANDLE ThreadManager::mutex = (CreateMutexA(nullptr, true, "m"));
ThreadManager::ThreadManager(size_t threadCount)
{
threads.reserve(threadCount);
for (int i = 0; i < threadCount; i++)
{
threadInfo.push_back(new ThreadStruct(i * OP_COUNT));
HANDLE event = CreateEventA(nullptr, false, true, std::to_string(i).c_str());
if (event)
{
threadEvents.push_back(event);
DuplicateHandle(GetCurrentProcess(), event, GetCurrentProcess(),
&(threadInfo[i]->threadEvent), 0, false, DUPLICATE_SAME_ACCESS);
}
else std::cout << "Unknown error: " << GetLastError() << std::endl;
HANDLE thread = CreateThread(nullptr, 0,
reinterpret_cast<LPTHREAD_START_ROUTINE>(&ThreadManager::threadFunc),
threadInfo[i],
CREATE_SUSPENDED, nullptr);
if (thread) threads.push_back(thread);
else std::cout << "Unknown error: " << GetLastError() << std::endl;
}
}
double ThreadManager::run()
{
size_t operations_done = threads.size() * OP_COUNT;
for (HANDLE t : threads) ResumeThread(t);
DWORD index;
Sleep(10);
while (operations_done < ThreadManager::N)
{
ReleaseMutex(ThreadManager::mutex);
index = WaitForMultipleObjects(this->threadEvents.size(), this->threadEvents.data(), false, 10000);
WaitForSingleObject(ThreadManager::mutex, 1000);
threadInfo[index] -> operationIndex = operations_done + OP_COUNT;
SetEvent(threadEvents[index]);
//std::cout << "Operations completed: " << operations_done << "/1000" << std::endl;
operations_done += OP_COUNT;
}
long double res_pi = 0;
for (auto&& ts: this->threadInfo)
{
res_pi += ts->pi;
ts->operationIndex = N;
}
res_pi /= N;
WaitForMultipleObjects(this->threads.size(), this->threads.data(), true, 10000);
std::cout.precision(10);
std::cout << "Pi value for " << threads.size() << " threads: " << res_pi;
threads.clear();
return 0;
}
ThreadManager::~ThreadManager()
{
if (!threads.empty())
for (HANDLE t: threads)
{
TerminateThread(t, -1);
CloseHandle(t);
}
std::destroy(threadInfo.begin(), threadInfo.end());
}
long double ThreadManager::calc(size_t startIndex)
{
long double xi = 0;
long double pi = 0;
for (size_t i = startIndex; i < startIndex + OP_COUNT; i++)
{
const long double ld_i = i;
const long double half = 0.5f;
xi = (ld_i + half) * (1.0 / N);
pi += ((4.0 / (1.0 + xi * xi)));
}
return pi;
}
DWORD WINAPI ThreadManager::threadFunc(ThreadStruct *ts)
{
while (ts->operationIndex < N)
{
WaitForSingleObject(ts->threadEvent, 1000);
ts->pi += calc(ts->operationIndex);
WaitForSingleObject(ThreadManager::mutex, 1000);
SetEvent(ts->threadEvent);
ReleaseMutex(ThreadManager::mutex);
}
return 0;
}
ThreadStruct::ThreadStruct(size_t opIndex)
{
this -> pi = 0;
this -> operationIndex = opIndex;
}
My Idea was that there will be an auto-reset event for each thread, which is set to signaled when a thread finishes it's computation. Main thread is waiting on one of thread Events to signal, and after modifying some values in a shared ThreadStruct (to enable thread start another portion of computations) it sets that same event to signaled, which is received by the exact same thread and the process received. But this doesn't work for even one thread: as a result i see values which are pretty random and not close to Pi (like 0.0001776328265).
Though my GDB debugger was working poorly (not displaying some variables and sometimes even crashing), I noticed that there were too much computations happening (I scaled down N to 1000. Therefore, I should have seen threads printing out "computing" 1000/40 = 25 times, but actually it happened hundreds of times)
Then I tried adding a mutex so threads wait until main thread is not busy before signaling the event. That made computation much slower, and still inaccurate and random (example: 50.26492171 in case of 16 threads).
What can be the problem? Or, if it's completely wrong, how do I organize multithread calculation then? Was creating a class a bad idea?
If you want to reproduce the problem, here is header file content (I am using c++20, MinGW 6.0):
#ifndef MULTITHREADPI_THREADMANAGER_H
#define MULTITHREADPI_THREADMANAGER_H
#include <iostream>
#include <vector>
#include <list>
#include <windows.h>
#include <memory>
struct ThreadStruct
{
size_t operationIndex;
long double pi;
HANDLE threadEvent = nullptr;
explicit ThreadStruct(size_t opIndex);
};
class ThreadManager
{
public:
explicit ThreadManager(size_t threadCount);
double run();
~ThreadManager();
private:
std::vector<ThreadStruct*> threadInfo;
std::vector<HANDLE> threads;
std::vector<HANDLE> threadEvents;
static HANDLE mutex;
static long double calc(size_t startIndex);
static const int OP_COUNT = 40;
static const int N = 100000000;
static DWORD WINAPI threadFunc(ThreadStruct* ts);
};
#endif //MULTITHREADPI_THREADMANAGER_H
To execute code, just construct ThreadManager
with desired number of threads as argument and call run() on it.