1

Consider this case:

for (...)
{
    const size_t count = ...

    for (size_t i = 0; i < count; ++i)
    {
        calculate(i); // thread-safe function
    }
}

What is the most elegant solution to maximize performance using C++17 and/or boost?

Cyclic "create + join" threads makes no sense because of huge overhead (which in my case exactly equals possible gain).

So I have to create N threads only once and keep them synchronized with the main one (using: mutex, shared_mutex, condition_variable, atomic, etc.). It appeared to be quite difficult task for such common and clear situation (in order to make everything really safe and fast). Sticking with it during days I have a feeling of "inventing a bicycle"...

  • Update 1: calculate(x) and calculate(y) can (and should) run in parallel
  • Update 2: std::atomic::fetch_add (or smth.) is more preferable than queue (or smth.)
  • Update 3: extreme computations (i.e. millions of "outer" calls and hundreds of "inner")
  • Update 4: calculate() changes internal object's data without returning a value

Intermediate solution

For some reason "async + wait" is much faster then "create + join" threads. So these two examples make 100% speed increase:

Example 1

for (...)
{
    const size_t count = ...

    future<void> execution[cpu_cores];

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x] = async(launch::async, ref(*this), x, count);
    }

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x].wait();
    }
}

void operator()(const size_t x, const size_t count)
{
    for (size_t i = x; i < count; i += cpu_cores)
    {
        calculate(i);
    }
}

Example 2

for (...)
{
    index = 0;

    const size_t count = ...

    future<void> execution[cpu_cores];

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x] = async(launch::async, ref(*this), count);
    }

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x].wait();
    }
}

atomic<size_t> index;

void operator()(const size_t count)
{
    for (size_t i = index.fetch_add(1); i < count; i = index.fetch_add(1))
    {
        calculate(i);
    }
}

Is it possible to make it even faster by creating threads only once and then synchronize them with a small overhead?

Final solution

Additional +20% of speed increase in comparison to std::async!

for (size_t i = 0; i < _countof(index); ++i) { index[i] = i; }

for_each_n(par_unseq, index, count, [&](const size_t i) { calculate(i); });

Is it possible to avoid redundant array "index"?

Yes:

for_each_n(par_unseq, counting_iterator<size_t>(0), count,

    [&](const size_t i)
    {
        calculate(i);
    });
Collapse
  • 13
  • 3
  • Each calculation is independent of the others? Have one feeder queue with a std::condition_variable guarding it, have your producer threader feeding items into the queue, and notifying one of the consumer threads when it pushes a new piece of data to the queue. – JohnFilleau May 29 '20 at 17:38
  • If youi're on Windows (which you're probably not) then you could use the Windows [threadpool API](https://learn.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). It's designed to do exactly this. There's also [something similar](https://linux.die.net/man/3/cp_thread_pool) for Linux, don't know how good it is. – Paul Sanders May 29 '20 at 17:47
  • @JohnFilleau, calculate(x) and calculate(y) can run in parallel. Queue is an overhead as well for such a trivial case. – Collapse May 29 '20 at 17:48
  • What's an acceptable level of overhead? – JohnFilleau May 29 '20 at 17:52
  • @PaulSanders, Windows. I will take a look. – Collapse May 29 '20 at 17:53
  • If the producer pushing data into the queue is too much overhead, perhaps the resource then isn't a queue but a sequential generator object. Each thread attempts to lock it and get the next number when it's done with its current number. Or if a single number is too fiddly, have it generate a batch of numbers at a time. – JohnFilleau May 29 '20 at 17:53
  • @JohnFilleau, std::atomic::fetch_add (or something) instead of a queue. But this is just a small piece of the whole solution. – Collapse May 29 '20 at 17:59
  • Sounds like you know a little more than me on the subject. Good luck! – JohnFilleau May 29 '20 at 18:01
  • ````std::async```` returning a ````std::future```` could work. But your requirements are still a littlr bit unclear. What about the outer for loop? How often does it run? Is the number of needed threads a const or a constexpr? How long would on thread run? Can you give some more and better understandable requirements? Thank you – A M May 29 '20 at 18:17
  • @ArminMontigny, "outer loop" is just a simplification. In general there will be millions of "outer" calls and hundreds of "inner". N is a constexpr. – Collapse May 29 '20 at 18:35
  • There's engineering tradeoffs everywhere you look here. Lots of independent short-path-length (since you say each calculation is about on par with the cost of creating a thread) is probably best done with one-thread-per-cpu (or one-per-cache depending on the memory footprints) and batch up the assignments, I'd start by just assigning equal ranges up front and letting them run independently, then chopping down any tent poles that become troublesome. – jthill May 29 '20 at 19:14
  • Depending on the calculations involved, since you say there's millions, you might be in SIMD-handroll territory, memory bandwidth could easily be the dominating factor. – jthill May 29 '20 at 19:16
  • @ArminMontigny, thank you for reminding me about std::async. I made an intermediate solution with it! – Collapse May 30 '20 at 04:27

1 Answers1

1

In the past, you'd use OpenMP, GNU Parallel, Intel TBB.¹

If you have c++17², I'd suggest using execution policies with standard algorithms.

It's really better than you can expect to do things yourself, although it

  • requires some fore-thought to choose your types to be amenable to standard algorithms
  • still helps if you know what will happen under the hood

Here's a simple example without further ado:

Live On Compiler Explorer

#include <thread>
#include <algorithm>
#include <random>
#include <execution>
#include <iostream>
using namespace std::chrono_literals;

static size_t s_random_seed = std::random_device{}();

static auto generate_param() {
    static std::mt19937 prng {s_random_seed};
    static std::uniform_int_distribution<> dist;
    return dist(prng);
}

struct Task {
    Task(int p = generate_param()) : param(p), output(0) {}

    int param;
    int output;

    struct ByParam  { bool operator()(Task const& a, Task const& b) const { return a.param < b.param; } };
    struct ByOutput { bool operator()(Task const& a, Task const& b) const { return a.output < b.output; } };
};

static void calculate(Task& task) {
    //std::this_thread::sleep_for(1us);
    task.output = task.param ^ 0xf0f0f0f0;
}

int main(int argc, char** argv) {
    if (argc>1) {
        s_random_seed = std::stoull(argv[1]);
    }

    std::vector<Task> jobs;

    auto now = std::chrono::high_resolution_clock::now;
    auto start = now();

    std::generate_n(
            std::execution::par_unseq,
            back_inserter(jobs),
            1ull << 28, // reduce for small RAM!
            generate_param);

    auto laptime = [&](auto caption) {
        std::cout << caption << " in " << (now() - start)/1.0s << "s" << std::endl;
        start = now();
    };
    laptime("generate randum input");

    std::sort(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        Task::ByParam{});

    laptime("sort by param");

    std::for_each(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        calculate);

    laptime("calculate");

    std::sort(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        Task::ByOutput{});

    laptime("sort by output");

    auto const checksum = std::transform_reduce(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        0, std::bit_xor<>{},
        std::mem_fn(&Task::output)
    );

    laptime("reduce");
    std::cout << "Checksum: " << checksum << "\n";
}

When run with the seed 42, prints:

generate randum input in 10.8819s
sort by param in 8.29467s
calculate in 0.22513s
sort by output in 5.64708s
reduce in 0.108768s
Checksum: 683872090

CPU utilization is 100% on all cores except for the first (random-generation) step.


¹ (I think I have answers demoing all of these on this site).

² See Are C++17 Parallel Algorithms implemented already?

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Excellent! This is exactly what I was looking for! Thank you. One minor thing... Is it possible to avoid redundant array? Just look at the recent revision of this question, please. – Collapse May 31 '20 at 19:02
  • @Collapse You'd usually use an indexed [iterator adaptor](https://www.boost.org/doc/libs/1_73_0/libs/iterator/doc/index.html) or indeed, if you don't have a container to iterate, a [counting_iterator](https://www.boost.org/doc/libs/1_73_0/libs/iterator/doc/counting_iterator.html). Here, I define my own [c++17 counting_iterator](https://godbolt.org/z/sNYqMS) which you can see works really well with the simd-enhanced pstl implementation on GNU. – sehe May 31 '20 at 23:08
  • The [complete disassembly of `main`](https://i.imgur.com/8RDY2ff.png) fits on one page and contains the hard-coded results of the first two accumulations (55 and 120). – sehe May 31 '20 at 23:08