1

I am currently working on a scientific simulation (Gravitational nbody). I first wrote it with a naive single-threaded algorithm, and this performed acceptably for a small number of particles. I then multi-threaded this algorithm (it is embarrassingly parallel), and the program took about 3x as long. What follows is a minimum, complete, verifiable example of a trivial algorithm with similar properties and output to a file in /tmp (it is designed to run on Linux, but the C++ is also standard). Be warned that if you decide to run this code, it will produce a 152.62MB file. The data is outputted to prevent the compiler from optimizing the computation out of the program.

#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>

constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;

void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
    unsigned threadCount = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    threads.reserve(threadCount);

    std::atomic<unsigned> currentIndex(0);

    for(unsigned i=0;i<threadCount;++i){
        threads.emplace_back([&currentIndex, batchSize, count, callback]{
            unsigned startAt = currentIndex.fetch_add(batchSize);

            if(startAt >= count){
                return;
            }else{
                for(unsigned i=0;i<count;++i){
                    unsigned index = startAt+i;
                    if(index >= count){
                        return;
                    }
                    callback(index);
                }
            }
        });
    }

    for(std::thread &thread : threads){
        thread.join();
    }
}

void threadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i) {
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        });
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

void unThreadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i){
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        for(unsigned x=0;x<NUMBER_COUNT;++x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        }
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

int main(int argc, char *argv[]) {
    if(argv[1][0] == 't'){
        threadedTest();
    }else{
        unThreadedTest();
    }
    return 0;
}

When I run this (compiled with clang 7.0.1 on Linux), I get the following times from the Linux time command. The difference between these is similar to what I see in my real program. The entry labelled "real" is what is relevant to this question, as this is the clock time that the program takes to run.

Single-threaded:

real    6m27.261s
user    6m27.081s
sys     0m0.051s

Multi-threaded:

real    14m32.856s
user    216m58.063s
sys     0m4.492s

As such, I ask what is causing this massive slowdown when I expect it to speed up significantly (roughly by a factor of 8, as I have an 8 core 16 thread CPU). I am not implementing this on the GPU as the next step is to make some changes to the algorithm to take it from O(n²) to O(nlogn), but that are also not amicable to a GPU. The changed algorithm will have less difference with my currently implemented O(n²) algorithm than the included example. Lastly, I want to observe that the subjective time to run each iteration (judged by the time between the iteration lines appearing) changes significantly in both the threaded and unthreaded runs.

mrflash818
  • 930
  • 13
  • 24
john01dav
  • 1,842
  • 1
  • 21
  • 40
  • If your threads use shared data and atomic variables or locks it's quite possible that all the synchronization costs completely negate any gains from parallelization. For optimal performance share nothing, or as little as possible. Could you approach this with a map/reduce style strategy? – tadman Mar 10 '19 at 01:56
  • 1
    @tadman As I can see, the atomic variable is only accessed once per thread. If there's 8 thread it's only accessed 8 times, which is very little. – user202729 Mar 10 '19 at 01:59
  • That's what I'd expect, but there must be something else that's impeding performance. Why are you using `constexpr` for simple numbers? Why not just `const`? I can't reproduce your problem because when running this code I get a segfault. – tadman Mar 10 '19 at 02:00
  • [c++ - What is the performance overhead of std::function? - Stack Overflow](https://stackoverflow.com/questions/5057382/what-is-the-performance-overhead-of-stdfunction) ? – user202729 Mar 10 '19 at 02:01
  • @tadman I am using constexpr to encourage inlining. I realize that it may not be completely necessary, but my C++ book IIRC claims that it makes it more likely. – john01dav Mar 10 '19 at 02:02
  • @tadman Run with first argument 't' or '' <-> threaded/non-threaded. – user202729 Mar 10 '19 at 02:03
  • Worth mentioning that when posting, but yeah, now it runs at least. – tadman Mar 10 '19 at 02:04
  • 1
    Are you sure you're dividing up the work properly? I can see the offset adjustment, but they all seem to iterate through `size` entries, so it should be a lot slower in your threaded version due to duplication of work. – tadman Mar 10 '19 at 02:08
  • `const` should be sufficient for simple values. `constexpr` can come into play with more complex expressions. In any case if you compile with `-O3` you should be set in either case, just that `const` makes more sense here. Maybe benchmark to see if there's a difference, or dump the compiled assembly output of impacted functions to see if anything changes. – tadman Mar 10 '19 at 02:15
  • Instead of saving to file just do `std::cout << std::accumulate(newNumbers.begin(), newNumbers.end(), 0.) << '\n';`. Since `newNumbers` are derived from random numbers the compiler has to emit code to do the actual calculations and output the sum. – Maxim Egorushkin Mar 10 '19 at 03:04

3 Answers3

5

It's kind of hard to follow this code, but I think you're duplicating work on a massive scale because each thread does nearly all the work, just skipping a small portion of it at the start.

I'm presuming the inner loop of runThreaded should be:

unsigned startAt = currentIndex.fetch_add(batchSize);

while (startAt < count) {
  if (startAt >= count) {
    return;
  } else {
    for(unsigned i=0;i<batchSize;++i){
      unsigned index = startAt+i;

      if(index >= count){
        return;
      }

      callback(index);
    }
  }

  startAt = currentIndex.fetch_add(batchSize);
}

Where i < batchSize is the key here. You should only do as much work as the batch dictates, not count times, which is the whole list minus the initial offset.

With this update the code runs significantly faster. I'm not sure if it does all the required work because it's hard to tell if that's actually happening, the output is very minimal.

tadman
  • 208,517
  • 23
  • 234
  • 262
  • This will need an additional loop so each thread will get another batch or you won't process every index. – 1201ProgramAlarm Mar 10 '19 at 02:18
  • @1201ProgramAlarm That's probably the case, as a thread-pool type structure would help here. I've adjusted the code to loop until it exhausts the pool. – tadman Mar 10 '19 at 02:19
  • 2
    Thank you! This was it :D I've been staring at the code for hours and I didn't see it. – john01dav Mar 10 '19 at 05:11
2

For easy parallelization over multiple CPUs I recommend using tbb::parallel_for. It uses the correct number of CPUs and splits the range for you, completely eliminating the risk of implementing it wrong. Alternatively, there is a parallel for_each in C++17. In other words, this problem has a number of good solutions.

Vectorizing code is a difficult problem and neither clang++-6 not g++-8 auto-vectorize the baseline code. Hence, SIMD version below I used excellent Vc: portable, zero-overhead C++ types for explicitly data-parallel programming library.

Below is a working benchmark that compares:

  • The baseline version.
  • SIMD version.
  • SIMD + multi-threading version.


#include <Vc/Vc>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>

constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;

double baseline() {
    double result = 0;

    std::vector<double> newNumbers(NUMBER_COUNT);
    std::vector<double> numbers(NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers)
        n = rnd();

    for(int i = 0; i < ITERATION_COUNT; ++i) {
        for(int x = 0; x < NUMBER_COUNT; ++x) {
            double total = 0;
            for(int y = 0; y < NUMBER_COUNT; ++y) {
                auto d = (y - x);
                total += numbers[y] * (d * d);
            }
            newNumbers[x] = total;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
        swap(numbers, newNumbers);
    }

    return result;
}

double simd() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const incv(Vc::double_v::Size);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        Vc::double_v x(Vc::IndexesFromZero);
        for(auto& new_n : newNumbers) {
            Vc::double_v totals;
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    auto d = y - x;
                    totals += n[j] * (d * d);
                    ++y;
                }
            }
            new_n = totals;
            x += incv;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

double simd_mt() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const v0123(Vc::IndexesFromZero);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        constexpr int SIMD_STEP = 4;
        tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
            Vc::double_v xs[SIMD_STEP];
            for(int is = 0; is < SIMD_STEP; ++is)
                xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
            Vc::double_v totals[SIMD_STEP];
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    for(int is = 0; is < SIMD_STEP; ++is) {
                        auto d = y - xs[is];
                        totals[is] += n[j] * (d * d);
                    }
                    ++y;
                }
            }
            std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
        });
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

struct Stopwatch {
    using Clock = std::chrono::high_resolution_clock;
    using Seconds = std::chrono::duration<double>;
    Clock::time_point start_ = Clock::now();

    Seconds elapsed() const {
        return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
    }
};


std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
    auto precision = s.precision(9);
    s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
    s.precision(precision);
    return s;
}

void benchmark() {
    Stopwatch::Seconds baseline_time;
    {
        Stopwatch s;
        double result = baseline();
        baseline_time = s.elapsed();
        std::cout << "baseline: " << result << ", " << baseline_time << '\n';
    }

    {
        Stopwatch s;
        double result = simd();
        auto time = s.elapsed();
        std::cout << "    simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }

    {
        Stopwatch s;
        double result = simd_mt();
        auto time = s.elapsed();
        std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }
}

int main() {
    benchmark();
    benchmark();
    benchmark();
}

Timings:

baseline: 2.76582e+257, 6.399848397s
    simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
 simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup

Notes:

  • My machine supports AVX but not AVX-512, so it is roughly 4x speedup when using SIMD.
  • simd_mt version uses 8 threads on my machine and larger SIMD steps. The theoretical speedup is 128x, on practice - 38x.
  • clang++-6 cannot auto-vectorize the baseline code, neither can g++-8.
  • g++-8 generates considerably faster code for SIMD versions than clang++-6 .
Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
1

Your heart is certainly in the right place minus a bug or two.

par_for is a complex issue depending on the payload of your loop. There is no one-size-fits-all solution to this. The payload can be anything from a couple of adds to almost infinite mutex blocks - for example by doing memory allocation.

The atomic variable as a work item pattern has always worked well for me but remember that atomic variables have a high cost on X86 (~400 cycles) and even incur a high cost if they are in an unexecuted branch as I found to my peril.

Some permutation of the following is usually good. Choosing the right chunks_per_thread (as in your batchSize) is critical. If you don't trust your users, you can test execute a few iterations of the loop to guess the best chunking level.

#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>

template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
  using namespace std;
  using namespace chrono;

  atomic<int> work_item{start};
  vector<future<void>> futures(std::thread::hardware_concurrency());

  for (auto &fut : futures) {
    fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
      for(;;) {
        int wi = work_item.fetch_add(step * chunks_per_thread);
        if (wi > end) break;
        int wi_max = std::min(end, wi+step * chunks_per_thread);
        while (wi < wi_max) {
          func(wi);
          wi += step;
        }
      }
    });
  }

  for (auto &fut : futures) {
    fut.wait();
  }
}

int main() {
  using namespace std;
  using namespace chrono;
  for (int k = 0; k != 2; ++k) {
    auto t0 = high_resolution_clock::now();
    constexpr int loops = 100000000;
    if (k == 0) {
      for (int i = 0; i != loops; ++i ) {
        if (i % 10000000 == 0) printf("%d\n", i);
      }
    } else {
      par_for(0, loops, 1, 100000, [](int i) {
        if (i % 10000000 == 0) printf("%d\n", i);
      });
    }
    auto t1 = high_resolution_clock::now();
    duration<double, milli> ns = t1 - t0;
    printf("k=%d %fms total\n", k, ns.count());
  }
}

results

...
k=0 174.925903ms total
...
k=1 27.924738ms total

About a 6x speedup.

I avoid the term "embarassingly parallel" as it is almost never the case. You pay exponentially higher costs the more resources you use on your journey from level 1 cache (ns latency) to globe spanning cluster (ms latency). But I hope this code snippet is useful as an answer.

Andy Thomason
  • 328
  • 1
  • 9
  • It's not clear to me how this improves on the previous answers; you might want to add something that highlights the differences compared to the accepted answer and the other answer with a 37x speedup. Unrelated, but there are a few classes of problems that are definitely embarrassingly parallel--this old trope exists for a reason. YMMV. – Dave Newton Apr 25 '19 at 00:33