2

I have a task to compute Pi with following formula:

enter image description here enter image description here

(i is in range from 0 to N, N = 10^8)

Computation should be completed in multiple threads with following requirement: each thread receives only a small fixed amount of computations to complete (in my case - 40 sum members at a time), and there should be a "Task pool" which gives new set of computations into a thread when it reports completion of previous set of operations given to it. Before a thread receives new task, it should wait. All of this should be done with WinAPI.

My solution is this class:

#include "ThreadManager.h"
#include <string>

HANDLE ThreadManager::mutex = (CreateMutexA(nullptr, true, "m"));

ThreadManager::ThreadManager(size_t threadCount)
{
    threads.reserve(threadCount);
    for (int i = 0; i < threadCount; i++)
    {
        threadInfo.push_back(new ThreadStruct(i * OP_COUNT));
        HANDLE event = CreateEventA(nullptr, false, true, std::to_string(i).c_str());

        if (event)
        {
            threadEvents.push_back(event);
            DuplicateHandle(GetCurrentProcess(), event, GetCurrentProcess(),
                            &(threadInfo[i]->threadEvent), 0, false, DUPLICATE_SAME_ACCESS);
        }
        else std::cout << "Unknown error: " << GetLastError() << std::endl;
        HANDLE thread = CreateThread(nullptr, 0,
                                     reinterpret_cast<LPTHREAD_START_ROUTINE>(&ThreadManager::threadFunc),
                                     threadInfo[i],
                                     CREATE_SUSPENDED, nullptr);
        if (thread) threads.push_back(thread);
        else std::cout << "Unknown error: " << GetLastError() << std::endl;
    }
}

double ThreadManager::run()
{
    size_t operations_done = threads.size() * OP_COUNT;
    for (HANDLE t : threads) ResumeThread(t);
    DWORD index;
    Sleep(10);
    while (operations_done < ThreadManager::N)
    {
        ReleaseMutex(ThreadManager::mutex);
        index = WaitForMultipleObjects(this->threadEvents.size(), this->threadEvents.data(), false, 10000);
        WaitForSingleObject(ThreadManager::mutex, 1000);
        threadInfo[index] -> operationIndex = operations_done + OP_COUNT;
        SetEvent(threadEvents[index]);
        //std::cout << "Operations completed: " << operations_done << "/1000" << std::endl;
        operations_done += OP_COUNT;
    }
    long double res_pi = 0;
    for (auto&& ts: this->threadInfo)
    {
        res_pi += ts->pi;
        ts->operationIndex = N;
    }
    res_pi /= N;
    WaitForMultipleObjects(this->threads.size(), this->threads.data(), true, 10000);
    std::cout.precision(10);
    std::cout << "Pi value for " << threads.size() << " threads: " << res_pi;
    threads.clear();
    return 0;
}

ThreadManager::~ThreadManager()
{
    if (!threads.empty())
        for (HANDLE t: threads)
        {
            TerminateThread(t, -1);
            CloseHandle(t);
        }
    std::destroy(threadInfo.begin(), threadInfo.end());
}

long double ThreadManager::calc(size_t startIndex)
{
    long double xi = 0;
    long double pi = 0;
    for (size_t i = startIndex; i < startIndex + OP_COUNT; i++)
    {
        const long double ld_i = i;
        const long double half = 0.5f;
        xi = (ld_i + half) * (1.0 / N);
        pi += ((4.0 / (1.0 + xi * xi)));
    }
    return pi;
}

DWORD WINAPI ThreadManager::threadFunc(ThreadStruct *ts)
{
    while (ts->operationIndex < N)
    {
        WaitForSingleObject(ts->threadEvent, 1000);
        ts->pi += calc(ts->operationIndex);
        WaitForSingleObject(ThreadManager::mutex, 1000);
        SetEvent(ts->threadEvent);
        ReleaseMutex(ThreadManager::mutex);
    }
    return 0;
}

ThreadStruct::ThreadStruct(size_t opIndex)
{
    this -> pi = 0;
    this -> operationIndex = opIndex;
}

My Idea was that there will be an auto-reset event for each thread, which is set to signaled when a thread finishes it's computation. Main thread is waiting on one of thread Events to signal, and after modifying some values in a shared ThreadStruct (to enable thread start another portion of computations) it sets that same event to signaled, which is received by the exact same thread and the process received. But this doesn't work for even one thread: as a result i see values which are pretty random and not close to Pi (like 0.0001776328265).

Though my GDB debugger was working poorly (not displaying some variables and sometimes even crashing), I noticed that there were too much computations happening (I scaled down N to 1000. Therefore, I should have seen threads printing out "computing" 1000/40 = 25 times, but actually it happened hundreds of times)

Then I tried adding a mutex so threads wait until main thread is not busy before signaling the event. That made computation much slower, and still inaccurate and random (example: 50.26492171 in case of 16 threads).

What can be the problem? Or, if it's completely wrong, how do I organize multithread calculation then? Was creating a class a bad idea?

If you want to reproduce the problem, here is header file content (I am using c++20, MinGW 6.0):

#ifndef MULTITHREADPI_THREADMANAGER_H
#define MULTITHREADPI_THREADMANAGER_H
#include <iostream>
#include <vector>
#include <list>
#include <windows.h>
#include <memory>

struct ThreadStruct
{
    size_t operationIndex;
    long double pi;
    HANDLE threadEvent = nullptr;
    explicit ThreadStruct(size_t opIndex);
};


class ThreadManager
{
public:
    explicit ThreadManager(size_t threadCount);
    double run();
    ~ThreadManager();

private:
    std::vector<ThreadStruct*> threadInfo;
    std::vector<HANDLE> threads;
    std::vector<HANDLE> threadEvents;
    static HANDLE mutex;
    static long double calc(size_t startIndex);
    static const int OP_COUNT = 40;
    static const int N = 100000000;
    static DWORD WINAPI threadFunc(ThreadStruct* ts);
};


#endif //MULTITHREADPI_THREADMANAGER_H

To execute code, just construct ThreadManager with desired number of threads as argument and call run() on it.

GaussGun
  • 100
  • 1
  • 9
  • You should probably start with a single thread implementation without any winapi - just check if you get the correct result with straightforward sum. Usually, such problems are about floating-point implementation - you can lose too much precision when adding many very small values to a big value. Maybe this is the case here. In this case you need to think how to re-group the values so, you only add values close to each other, again without any threads or winapi. When you see that single-threaded algorithm is correct you can think of how properly implement it multithreaded. – dewaffled Dec 11 '22 at 16:11
  • This is reinventing what the system already provides for you: A [thread pool](https://learn.microsoft.com/en-us/archive/msdn-magazine/2011/august/windows-with-c-the-windows-thread-pool-and-work). Also, consider using a real debugger in place of GDB (any of the [Windows debuggers](https://learn.microsoft.com/en-us/windows-hardware/drivers/debugger/) will do). – IInspectable Dec 11 '22 at 16:15
  • @dewaffled ```ThreadManager::calc() ``` works fine on its own. Something wrong happens on winAPI implementation – GaussGun Dec 11 '22 at 17:28
  • I don't know why you're protecting a call to SetEvent with a mutex. But in any case the logic of your threadFunc seems wrong. Once it's set the event, it doesn't wait to loop back around through the while loop again, and the value of `operationIndex` that it sees at the top of the loop is indeterminate. – Jonathan Potter Dec 11 '22 at 20:37
  • @JonathanPotter so you suggest removing the mutex and placing `WaitForSingleObject` at the end of the loop? – GaussGun Dec 11 '22 at 20:59

2 Answers2

1

Even with all below changed, it doesn't give consistent values close to PI. There must be more stuff to fix. I think it has to do with the events. If I understand it correctly, there are two different things the mutex protects. And the event is also used for 2 different things. So both change their meaning during execution. This makes it very hard to think it through.

1. Timeouts

WaitForMultipleObjects may run into a timeout. In that case it returns WAIT_TIMEOUT, which is defined as 0x102 or 258. You access the threadInfo vector with that value without bounds checking. You can use at(n) for a bounds-checked version of [n].

You can easily run into a 10 second timeout when debugging or when setting OP_COUNT to high numbers. So, maybe you want to set it to INFINITE instead.

This leads to all sorts of misbehavior:

  1. the threads information (operationIndex) is updated while the thread might work on it.
  2. operations_done is updated although those operations may not be done
  3. The mutex is probably overreleased

2. Limit the number of threads

The thread manager should also check the number of threads, since you can't set it to a number higher than MAXIMUM_WAIT_OBJECTS, otherwise WaitForMultipleObjects() won't work reliably.

3. Off by 1 error

Should be

size_t operations_done = (threads.size()-1) * OP_COUNT;

or

threadInfo[index] -> operationIndex = operations_done; // was + OP_COUNT

otherwise it'll skip one batch

4. Ending the threads

Ending the threads relies on the timeouts.

When you replace all timeouts by INFINITE, you'll notice that your threads never end. You need another ReleaseMutex(mutex); before res_pi /= N;

Thomas Weller
  • 55,411
  • 20
  • 125
  • 222
  • Don't quite understand 3rd point. At first, amount of threads multiplied by operation count is "scheduled". Did you mean to substract 1 from the whole thing? – GaussGun Dec 11 '22 at 17:19
  • With one thread, that thread should start at 0. Next iteration it should start at 40. Your implementation calculates `threads.size()*OP_COUNT`, which is 40 and then adds `OP_COUNT` to a total of 80 for the second iteration. That's why I'm subtracting 1. – Thomas Weller Dec 11 '22 at 17:22
  • right after the threads are Resumed, since I create events in signaled state, they should start first batch of calculations, don't they? so all the `threads.size()*OP_COUNT` operations should start. – GaussGun Dec 11 '22 at 17:34
  • That's true, but the next thread should start there and not at + OP_COUNT. – Thomas Weller Dec 11 '22 at 17:41
  • I should delete `+OP_COUNT` from the loop then. By the way, it gives somewhat accurate (but still random at 0.001 level) results if I leave it as `threads.size() * OP_COUNT - 1` (-1 here is because we start with 0 and end at 99999999) with 1 thread. – GaussGun Dec 11 '22 at 17:46
  • Why do you think `threads.size() * OP_COUNT - 1` should work? – Thomas Weller Dec 11 '22 at 17:54
  • Actually it shouldn't, it's wrong. I ended up removing `+ OP_COUNT` from the loop – GaussGun Dec 11 '22 at 17:58
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/250326/discussion-between-gaussgun-and-thomas-weller). – GaussGun Dec 11 '22 at 18:01
0
struct CommonData;

struct ThreadData 
{
    CommonData* pData;
    ULONG i, k;

    ThreadData(CommonData* pData, ULONG i, ULONG k) : pData(pData), i(i), k(k) {}

    static ULONG CALLBACK Work(void* p);
};

struct CommonData
{
    HANDLE hEvent = 0;
    LONG dwActiveThreadCount = 1;
    ULONG N;
    union {
        double res = 0;
        __int64 i64;
    };

    CommonData(ULONG N) : N(N) {}

    ~CommonData()
    {
        if (HANDLE h = hEvent)
        {
            CloseHandle(h);
        }
    }

    void DecThread()
    {
        if (!InterlockedDecrement(&dwActiveThreadCount))
        {
            if (!SetEvent(hEvent)) __debugbreak();
        }
    }

    BOOL AddThread(ULONG i, ULONG k)
    {
        InterlockedIncrementNoFence(&dwActiveThreadCount);

        if (ThreadData* ptd = new ThreadData(this, i, k))
        {
            if (HANDLE hThread = CreateThread(0, 0, ThreadData::Work, ptd, 0, 0))
            {
                CloseHandle(hThread);

                return TRUE;
            }

            delete ptd;
        }

        DecThread();

        return FALSE;
    }

    BOOL Init()
    {
        return 0 != (hEvent = CreateEvent(0, 0, 0, 0));
    }

    void Wait()
    {
        DecThread();
        if (WaitForSingleObject(hEvent, INFINITE) != WAIT_OBJECT_0) __debugbreak();
    }
};

ULONG CALLBACK ThreadData::Work(void* p)
{
    CommonData* pData = reinterpret_cast<ThreadData*>(p)->pData;
    ULONG i = reinterpret_cast<ThreadData*>(p)->i;
    ULONG k = reinterpret_cast<ThreadData*>(p)->k;
    delete p;

    ULONG N = pData->N;
    double pi = 0;
    do 
    {
        double xi = (i++ + 0.5) / N;
        pi += 4 / (1 + xi * xi);
    } while (--k);

    union {
        double d;
        __int64 i64;
    };

    i64 = pData->i64;

    for (;;)
    {
        union {
            double d_compare;
            __int64 i64_compare;
        };

        i64_compare = i64;

        d += pi;

        if (i64_compare == (i64 = InterlockedCompareExchange64(
                           &pData->i64, i64, i64_compare)))
        {
            break;
        }
    }

    pData->DecThread();

    return 0;
}

double calc_pi(ULONG N)
{
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    if (si.dwNumberOfProcessors)
    {
        CommonData cd(N);
        if (cd.Init())
        {
            ULONG k = (N + si.dwNumberOfProcessors - 1) / si.dwNumberOfProcessors, i = 0;
            do 
            {
                if (!cd.AddThread(i, k))
                {
                    break;
                }
            } while (i += k, --si.dwNumberOfProcessors);

            cd.Wait();

            if (!si.dwNumberOfProcessors)
            {
                return cd.res/ N;
            }
        }
    }

    return 0;
}

when i call calc_pi(100000000) on 8 core i got 3.1415926535898153

RbMm
  • 31,280
  • 3
  • 35
  • 56
  • Could you please provide some description, since that is completely different implementation? – GaussGun Dec 12 '22 at 14:30
  • @GaussGun what questions you have and what unclear ? (i bit modify code now) – RbMm Dec 12 '22 at 21:37
  • I'd say I don't understand how it works at all, but concrete questions I'd ask: For me it says `InterlockedIncrementNoFence` is undefined, header missed (I included "windows.h") ? What is the purpose of unnamed unions? Why is ThreadData is deleted after the thread is launched? Also there's a requirement in my task I didn't specify clearly: I have to create threads as suspended and resume as soon as they get first tasks. And I can't change it myself because I don't understand how and why it works in the first place. – GaussGun Dec 13 '22 at 16:17
  • @GaussGun [*InterlockedIncrementNoFence*](https://learn.microsoft.com/en-us/previous-versions/windows/desktop/legacy/hh972667(v=vs.85)) defined in *winnt.h* (and this is macro only). and you can replace it to `InterlockedIncrement` - on x86/x64 - no diferent. – RbMm Dec 13 '22 at 17:17
  • *What is the purpose of unnamed unions?* - interlocked add for double – RbMm Dec 13 '22 at 17:17
  • *Why is ThreadData is deleted after the thread is launched?* - this is not true - it not deleted after thread is launched. you bad look code – RbMm Dec 13 '22 at 17:18
  • *I have to create threads as suspended and resume as soon as they get first tasks.* - not view sense in such requirement. thred already created with task – RbMm Dec 13 '22 at 17:19
  • For me creating threads as suspended being a necessity doesn't make sense either, but that's the task. Thank you for answering – GaussGun Dec 13 '22 at 17:26
  • @GaussGun in this case need use thread pool. in all case my answer not for your level – RbMm Dec 13 '22 at 17:42