0

I am trying to use multi-threading for encoding with Random Linear Network Coding (RLNC) to increase performance. However, I have problem with the performance issue, my multi-thread solution is slower, much slower than the current non-threaded version. I have a suspension that it is the atomic access on m_completed and the std::mutex for inserting elements into m_results which are killing my performance. I am however, not aware of how to confirm this.

So a bit more information the function completed() is called in a while-loop in the main thread while(!encoder.completed()){} which results in a hell of a lot of atomic access, but I cannot find a proper way to do it without the atomic or mutex lock. You can find the code below.

So please if someone can see a mistake or guide me towards a better way of doing this I will be very grate full. I have speend 1.5 weeks on figuring out what the is wrong now, and my only idea is atomic or std::mutex locks

#include <cstdint>
#include <vector>
#include <mutex>

#include <memory>
#include <atomic>
...
namespace master_thesis
{
namespace encoder
{
class smart_encoder
{
...   
    void start()
    {
    ... 
    // Incase there are more an uneven amount 
    // of symbols we adjust this abov

            else
            {

                m_pool.enqueue([this, encoder](){
                        std::vector<std::vector<uint8_t>> total_payload(this->m_coefficients,
                                                                        std::vector<uint8_t>(encoder->payload_size()));

                        std::vector<uint8_t> payload(encoder->payload_size());

                        for (uint32_t j = 0; j < this->m_coefficients; ++j)
                        {
                            encoder->write_payload(payload.data());
                            total_payload[j] = payload; //.insert(total_payload.begin() + j, payload);
                        }





                        this->m_mutex.lock();

                        this->m_result.insert(std::end(this->m_result),
                                              std::begin(total_payload),
                                              std::end(total_payload));
                        ++(this->m_completed);

                        this->m_mutex.unlock();
                    });
            }

        }
    }

    bool completed()
    {
        return m_completed.load() >= (m_threads - 1);
    }

    std::vector<std::vector<uint8_t>> result()
    {
        return m_result;
    }

private:
    uint32_t m_symbols;
    uint32_t m_symbol_size;
    std::atomic<uint32_t> m_completed;
    unsigned int m_threads;
    uint32_t m_coefficients;

    std::mutex m_mutex;

    std::vector<uint8_t> m_data;
    std::vector<std::vector<uint8_t>> m_result;

    ThreadPool m_pool;

    std::vector<std::shared_ptr<rlnc_encoder>> m_encoders;
};
}
}
Lars Nielsen
  • 2,005
  • 2
  • 25
  • 48
  • Both are fast when there is no contention. – stark May 09 '18 at 11:15
  • Rather than loop like that - use a condition variable. – UKMonkey May 09 '18 at 11:15
  • @stark but the problem is that there can be contention for both, at least from what I can thell – Lars Nielsen May 09 '18 at 11:25
  • Have you tried profiling? Depending on your profiler (and its settings) it might show whether or not the application spends a significant amount of time in the synchronization. – Max Langhof May 09 '18 at 11:28
  • @MaxLanghof what tool would you use for that ? :) – Lars Nielsen May 09 '18 at 11:29
  • Depends on your environment. I wouldn't recommend the visual studio profiler for a gcc toolchain :). I can recommend Intel VTune if you have access to it. – Max Langhof May 09 '18 at 11:30
  • @MaxLanghof sitting on linux so no visual studio :) – Lars Nielsen May 09 '18 at 11:36
  • @UKMonkey you mean instead of the `for`-loop? – Lars Nielsen May 09 '18 at 11:37
  • I am guessing here... Are you making a copy of the whole payload object inside the mutex? Try doing an rval/swap instead! You could split your mutex block into 2 mutexes, one to create or find an entry, and one to swap in the payload. – Gem Taylor May 09 '18 at 11:42
  • Did the encoders already encode things? As in, is this parallelization solely for writing out the encoder data? Does `write_payload` do the same thing every time (do you actually write out the same data `m_coefficient` times for every encoder)? I am under the impression that all your threads do the same work... – Max Langhof May 09 '18 at 11:42
  • @GemTaylor That could be it. Lars, how big is `N = encoder->payload_size()` and `M = this->m_coefficients`? You are copying `M*N` bytes within the mutex... – Max Langhof May 09 '18 at 11:46
  • @MaxLanghof `write_payload` is the encoding operation (poorly named, not my decision :) ) – Lars Nielsen May 09 '18 at 11:47
  • `m_coefficient` varies `n/8` where `n` in `[8,16,32,128,256]` and `N` also varies but never below `4194304` bytes – Lars Nielsen May 09 '18 at 11:49
  • hmmm how to avoid the copy.... – Lars Nielsen May 09 '18 at 11:49
  • 2
    To avoid the copy, try `m_result.push_back(std::move(total_payload));`. This still might reallocate `m_result` so if you know the number of results upfront, then use [`resize`](http://en.cppreference.com/w/cpp/container/vector/resize)/[`reserve`](http://en.cppreference.com/w/cpp/container/vector/reserve) so it won't reallocate while holding the mutex. – Sean Cline May 09 '18 at 11:57
  • 1
    And for moving instead of copying while keeping the `insert` within the mutex region, see [here](https://stackoverflow.com/questions/10720122/is-there-a-standard-way-of-moving-a-range-into-a-vector). – Max Langhof May 09 '18 at 12:07
  • @LarsNielsen instead of the `while(!encoder.completed()){}` it will release the contention on the atomic variable; and give you an extra core to do work with. – UKMonkey May 09 '18 at 12:53
  • Yes, the trick is to use a swap or move of the content in the insert. Also, once the index has been introduced to the map, you don't need to hold the main mutex. If you can guarantee that there won't be another insert top the same index, you don't even need to protect against overwrites at the same index, but if you do, then you could have a mutex per entry, so they only block when the entries collide. – Gem Taylor May 09 '18 at 13:01
  • @SeanCline when I try using `m_result.push_back(total_payloads)` I get error: no matching function for call to ‘std::vector >::push_back(std::remove_reference >&>::type)’ m_result.push_back(std::move(total_payloads));` – Lars Nielsen May 10 '18 at 07:34
  • and I initialise `m_result`: ` m_result = std::vector>(symbols, std::vectorpayload_size()));` shoudn't that be enough ? – Lars Nielsen May 10 '18 at 07:35

1 Answers1

1

The bottleneck is probably not the call to Completed().

On x86, a read from a word-aligned uint32_t is automatically an atomic operation, std::atomic or not. The only thing std::atomic does for uint32_t on x86 is ensure it is word-aligned and that the compiler doesn't reorder it or optimize it out.

A tight-loop load isn't the cause of bus contention. There will be a cache miss on the first read, but subsequent loads will be cache hits until the cache is invalidated by a write to the address from another thread. There's a caveat--accidental sharing of cache lines ("false sharing"). One idea for how you can eliminate this possibility by switching to an array with 60 bytes of unused padded on both sides of your atomic (only use the middle one). std::atomic<uint32_t> m_buffered[31]; std::atomic<uint_32t>& m_completed = m_buffered[15];

Keep in mind that a tight-loop will tie up one of your cores doing nothing but looking in its cache. That's a waste of money... ;) That could very well be the cause of your issue. You should could change your code to be like this:

int m_completed = 0;  // no longer atomic
std::condition_variable cv;

// in main...(pseudocode)
lock (unique) m_mutex  // the m_mutex from the class
    while !Completed()
        cv.wait(m_mutex)

// in thread (pseudocode)
bool toSignal = false;
lock guard m_mutex
    this->m_result.insert(std::end(this->m_result),
                          std::begin(total_payload),
                          std::end(total_payload));
    ++m_completed;
    toSignal = Completed();
if toSignal
    cv.signalOne()

It's also could be that your performance loss has to do with the mutex critical section. This critical section has the potential to be many orders of magnitude longer than a cache miss. I would recommend comparing the time for 1 thread, 2 threads, and 4 threads in the threadpool. if 2 threads isn't faster than 1 thread, then your code is essentially running sequentially.

How to measure? Profiling tools are good for when you don't know what to optimize. I don't have a lot of experience with them, but I know that (at least some of the older ones) can get a little sketchy when it comes to multithreading. You can also use a good old fashioned timer. C++11 has a high_resolution_clock that probably has a resolution in the 1's of microseconds, if you have decent hardware.

Lastly, I see a lot of opportunity for algorithmic/scalar optimization. Pre-allocate vectors instead of doing it every time. Use pointers or std::move to avoid unnecessary deep-copies. Pre-allocate m_result and have the threads write to specific index offsets.

Humphrey Winnebago
  • 1,512
  • 8
  • 15