6

I'm trying to create a simple pool object, which I would like to more-or-less fairly allocate access to a set of shared resources to any threads that ask for it. In windows, I would typically have an array of Mutexes and do a WaitForMultipleObjects, with bWaitAll=FALSE (see windows_pool_of_n_t below). But I'm hoping to someday be able to port this to other OSes, so I'd like to stick with the standard. A deque of resources, with a condition_variable on size()!=0 seemed like the obvious solution (see pool_of_n_t below).

But for reasons I don't understand, that code serializes thread access. I'm not expecting strict fairness, but this is pretty much the worst possible case - the thread that had the lock last time always gets the lock the next time. It's not that std::mutex doesn't conform to Windows more-or-less fair scheduling, since using just a mutex without the condition variable works as expected, although only for a pool of one, of course (see pool_of_one_t below).

Can anyone explain this? Is there a way around this?

the results:

C:\temp\stdpool>bin\stdpool.exe
pool:pool_of_one_t
thread 0:19826 ms
thread 1:19846 ms
thread 2:19866 ms
thread 3:19886 ms
thread 4:19906 ms
thread 5:19926 ms
thread 6:19946 ms
thread 7:19965 ms
thread 8:19985 ms
thread 9:20004 ms
pool:windows_pool_of_n_t(1)
thread 0:19819 ms
thread 1:19838 ms
thread 2:19858 ms
thread 3:19878 ms
thread 4:19898 ms
thread 5:19918 ms
thread 6:19938 ms
thread 7:19958 ms
thread 8:19978 ms
thread 9:19997 ms
pool:pool_of_n_t(1)
thread 9:3637 ms
thread 0:4538 ms
thread 6:7558 ms
thread 4:9779 ms
thread 8:9997 ms
thread 2:13058 ms
thread 1:13997 ms
thread 3:17076 ms
thread 5:17995 ms
thread 7:19994 ms
pool:windows_pool_of_n_t(2)
thread 1:9919 ms
thread 0:9919 ms
thread 2:9939 ms
thread 3:9939 ms
thread 5:9958 ms
thread 4:9959 ms
thread 6:9978 ms
thread 7:9978 ms
thread 9:9997 ms
thread 8:9997 ms
pool:pool_of_n_t(2)
thread 2:6019 ms
thread 0:7882 ms
thread 4:8102 ms
thread 5:8182 ms
thread 1:8382 ms
thread 8:8742 ms
thread 7:9162 ms
thread 9:9641 ms
thread 3:9802 ms
thread 6:10201 ms
pool:windows_pool_of_n_t(5)
thread 4:3978 ms
thread 3:3978 ms
thread 2:3979 ms
thread 0:3980 ms
thread 1:3980 ms
thread 9:3997 ms
thread 7:3999 ms
thread 6:3999 ms
thread 5:4000 ms
thread 8:4001 ms
pool:pool_of_n_t(5)
thread 2:3080 ms
thread 0:3498 ms
thread 8:3697 ms
thread 3:3699 ms
thread 6:3797 ms
thread 7:3857 ms
thread 1:3978 ms
thread 4:4039 ms
thread 9:4057 ms
thread 5:4059 ms

the code:

#include <iostream>
#include <deque>
#include <vector>
#include <mutex>
#include <thread>
#include <sstream>
#include <chrono>
#include <iomanip>
#include <cassert>
#include <condition_variable>
#include <windows.h>

using namespace std;

class pool_t {
    public:
        virtual void check_in(int size) = 0;
        virtual int check_out() = 0;
        virtual string pool_name() = 0;
};

class pool_of_one_t : public pool_t {
    mutex lock;

public:
    virtual void check_in(int resource) {
        lock.unlock();
    }

    virtual int check_out() {
        lock.lock();
        return 0;
    }

    virtual string pool_name() {
        return "pool_of_one_t";
    }

};


class windows_pool_of_n_t : public pool_t {
    vector<HANDLE> resources;

public:
    windows_pool_of_n_t(int size) {
        for (int i=0; i < size; ++i)
            resources.push_back(CreateMutex(NULL, FALSE, NULL));
    }

    ~windows_pool_of_n_t() {
        for (auto resource : resources)
            CloseHandle(resource);
    }

    virtual void check_in(int resource) {
        ReleaseMutex(resources[resource]);
    }

    virtual int check_out() {
        DWORD result = WaitForMultipleObjects(resources.size(),
                resources.data(), FALSE, INFINITE);
        assert(result >= WAIT_OBJECT_0 
                && result < WAIT_OBJECT_0+resources.size());

        return result - WAIT_OBJECT_0;
    }

    virtual string pool_name() {
        ostringstream name;
        name << "windows_pool_of_n_t(" << resources.size() << ")";
        return name.str();
    }
};

class pool_of_n_t : public pool_t {
    deque<int> resources;
    mutex lock;
    condition_variable not_empty;

public:
    pool_of_n_t(int size) {
        for (int i=0; i < size; ++i)
            check_in(i);
    }

    virtual void check_in(int resource) {
        unique_lock<mutex> resources_guard(lock);
        resources.push_back(resource);
        resources_guard.unlock();
        not_empty.notify_one();
    }

    virtual int check_out() {
        unique_lock<mutex> resources_guard(lock);
        not_empty.wait(resources_guard,
                [this](){return resources.size() > 0;});
        auto resource = resources.front();
        resources.pop_front();
        bool notify_others = resources.size() > 0;
        resources_guard.unlock();
        if (notify_others)
            not_empty.notify_one();

        return resource;
    }

    virtual string pool_name() {
        ostringstream name;
        name << "pool_of_n_t(" << resources.size() << ")";
        return name.str();
    }
};


void worker_thread(int id, pool_t& resource_pool)
{
    auto start_time = chrono::system_clock::now();
    for (int i=0; i < 100; ++i) {
        auto resource = resource_pool.check_out();
        this_thread::sleep_for(chrono::milliseconds(20));
        resource_pool.check_in(resource);
        this_thread::yield();
    }

    static mutex cout_lock;
    {
        unique_lock<mutex> cout_guard(cout_lock);
        cout << "thread " << id << ":"
            << chrono::duration_cast<chrono::milliseconds>(
                    chrono::system_clock::now() - start_time).count()
            << " ms" << endl;
    }
}

void test_it(pool_t& resource_pool)
{
    cout << "pool:" << resource_pool.pool_name() << endl;
    vector<thread> threads;
    for (int i=0; i < 10; ++i)
        threads.push_back(thread(worker_thread, i, ref(resource_pool)));
    for (auto& thread : threads)
        thread.join();

}

int main(int argc, char* argv[])
{
    test_it(pool_of_one_t());
    test_it(windows_pool_of_n_t(1));
    test_it(pool_of_n_t(1));
    test_it(windows_pool_of_n_t(2));
    test_it(pool_of_n_t(2));
    test_it(windows_pool_of_n_t(5));
    test_it(pool_of_n_t(5));

    return 0;
}
Mark Wright
  • 705
  • 7
  • 20
  • This could be a hard question. I mean, there is the easy answer: `condition_variable` makes no such guarantees. The hard answer is working out exactly how bad it is, assuming you didn't make an obvious oops in your above code. – Yakk - Adam Nevraumont Jun 15 '15 at 15:10
  • I don't see any obvious oops. I suspect that this is due to slightly different interactions between `this_thread::yield()` and the two different mutex pathways. In Linux, I expect your code would schedule fairly. Note that the Standard speaks of `yield` as *only an opportunity for rescheduling* but the details are OS-specific. It might be interesting as an experiment to try replacing the `this_thread::yield();` with `this_thread::sleep_for( chrono::nanoseconds(1) );`. This would force the thread's loss of priority in the scheduling queue and perhaps eliminate the differences in Windows. – Special Sauce Jun 15 '15 at 15:40
  • Neither sleep_for() nor Win32's ::Sleep() result in better scheduling. It seems like Yaak's comment is the answer - the standard doesn't promise it and I shouldn't try to rely on it. – Mark Wright Jun 15 '15 at 16:50
  • have you tried using a busy wait instead of the `sleep_for`? in other words: are you sure that `sleep_for` is reproducably sleeping for the same amount? it only promises to sleep [at least](http://www.cplusplus.com/reference/thread/this_thread/sleep_for/?kw=sleep_for) for the given amount of time. – BeyelerStudios Jun 15 '15 at 17:07
  • I'm pretty confident that the wait is correct. The last thread finishes in 100*20*10ms (ops * sleep * workers), which is what I would expect. It's the order of thread completion - the fastest thread finishes an order of magnitude faster than the slowest, which indicates that the processing is serial (if I output worker thread progress during execution, its obvious that one thread is getting the resource the whole time). – Mark Wright Jun 15 '15 at 18:07
  • @MarkW yes I ran your code and the wait is not the culprit – BeyelerStudios Jun 15 '15 at 18:21

2 Answers2

7

I did your test pool:pool_of_n_t(2) on Linux and see the problem in

this_thread::yield();

See results on my comp for the test pool:pool_of_n_t(2):

1) this_thread::yield():

$./a.out                                                                       
pool:pool_of_n_t(2)
thread 0, run for:2053 ms
thread 9, run for:3721 ms
thread 5, run for:4830 ms
thread 6, run for:6854 ms
thread 3, run for:8229 ms
thread 4, run for:8353 ms
thread 7, run for:9441 ms
thread 2, run for:9482 ms
thread 1, run for:10127 ms
thread 8, run for:10426 ms

They are similar to yours.

2) And the same test when I replace this_thread::yield() with pthread_yield():

$ ./a.out                                                               
pool:pool_of_n_t(2)
thread 0, run for:7922 ms
thread 3, run for:8853 ms
thread 4, run for:8854 ms
thread 1, run for:9077 ms
thread 5, run for:9364 ms
thread 9, run for:9446 ms
thread 7, run for:9594 ms
thread 2, run for:9615 ms
thread 8, run for:10170 ms
thread 6, run for:10416 ms

It is much fairer. You assume that this_thread::yield() gives indeed CPU to another thread but it is not giving it.

This is disas for this_thread::yield for gcc 4.8:

(gdb) disassembl this_thread::yield
Dump of assembler code for function std::this_thread::yield():
   0x0000000000401fb2 <+0>: push   %rbp
   0x0000000000401fb3 <+1>: mov    %rsp,%rbp
   0x0000000000401fb6 <+4>: pop    %rbp
   0x0000000000401fb7 <+5>: retq   
End of assembler dump.

I don't see any rescheduling

And this is disas for pthread_yield:

(gdb) disassemble pthread_yield
Dump of assembler code for function pthread_yield:
   0x0000003149c084c0 <+0>: jmpq   0x3149c05448 <sched_yield@plt>
End of assembler dump.
(gdb) disassemble sched_yield
Dump of assembler code for function sched_yield:
   0x00000031498cf520 <+0>: mov    $0x18,%eax
   0x00000031498cf525 <+5>: syscall 
   0x00000031498cf527 <+7>: cmp    $0xfffffffffffff001,%rax
   0x00000031498cf52d <+13>:    jae    0x31498cf530 <sched_yield+16>
   0x00000031498cf52f <+15>:    retq   
   0x00000031498cf530 <+16>:    mov    0x2bea71(%rip),%rcx        # 0x3149b8dfa8
   0x00000031498cf537 <+23>:    xor    %edx,%edx
   0x00000031498cf539 <+25>:    sub    %rax,%rdx
   0x00000031498cf53c <+28>:    mov    %edx,%fs:(%rcx)
   0x00000031498cf53f <+31>:    or     $0xffffffffffffffff,%rax
   0x00000031498cf543 <+35>:    jmp    0x31498cf52f <sched_yield+15>
End of assembler dump.
  • I tried the same test with my pool_of_one_t in linux and the mutex behavior was similarly unfair. I think the answer here is that the c++ std concurrency APIs aren't up to the task and I'll have to write this using platform-specific APIs. – Mark Wright Jun 15 '15 at 16:48
  • 2
    Per [this answer](http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about/12961816#12961816), for GCC 4.8 you need `--enable-libstdcxx-time` when building GCC to have `yield()` not be a no-op. – T.C. Jun 15 '15 at 17:56
  • @MarkW In case you where not notified, you need to read T.C's comment above. – Yakk - Adam Nevraumont Jun 15 '15 at 18:15
2

I don't think condition variable is the culprit.

Both the Linux "Completely Fair Queue" and the Windows thread dispatcher assume that the ideal goal is to give each thread it's entire time slice (i.e. to be fair.) They carry this so far as to assume that if a thread yields before it consumes its entire time slice that it goes near the front of the queue [this is a gross simplification] because that is the "fair" thing to do.

I find this very unfortunate. If you have three threads, one of which can do work and the other two of which are blocked waiting on that one, both Windows and Linux dispatchers will bounce back and forth between the blocked threads many times before giving the "correct" thread a chance.

Dale Wilson
  • 9,166
  • 3
  • 34
  • 52
  • I'm blaming the condition variable because the other two pools - one which waits exclusively on a mutex, the other (windows-specific) that waits on a pool of mutexes - both have fair (or fair enough for my needs) behavior. It's only the pool with the condition variable that shows serial thread scheduling. – Mark Wright Jun 15 '15 at 18:09