I've written a function which operates on a matrix of 3D vectors, essentially computing the difference between the vectors represented by the blue crosses for each iterated point represented by the red cross (see diagram), and accumulating the dot products of these differences with themselves in an output variable. To be precise, the vector at the iterated point is subtracted from the vector at the cell below, times a constant. Further, the value that is accumulated is the dot product multiplied with another constant.
The following is the minimum reproducer. There is a naive variant which relies on auto-vectorization, and there is an optimized one which uses AVX2 intrinsics and additionally performs loop blocking for better cache access.
#include <iostream>
#include <iomanip>
#include <memory>
#include <random>
#include <immintrin.h>
//+//////////////////////////////////////////////
// data structures
//+//////////////////////////////////////////////
class v3
{
double data_[3] = {};
public:
double& operator()(const std::size_t index)
{
return data_[index];
}
const double& operator()(const std::size_t index) const
{
return data_[index];
}
v3 operator-(const v3& op) const
{
auto ret_val = v3{};
ret_val(0) = (*this)(0) - op(0);
ret_val(1) = (*this)(1) - op(1);
ret_val(2) = (*this)(2) - op(2);
return ret_val;
}
v3 operator*(const double& op) const
{
auto ret_val = v3{};
ret_val(0) = (*this)(0) * op;
ret_val(1) = (*this)(1) * op;
ret_val(2) = (*this)(2) * op;
return ret_val;
}
};
class matrix_v3
{
std::size_t n1_;
std::size_t n2_;
std::unique_ptr<v3> positions_;
public:
matrix_v3(const std::size_t& n1, const std::size_t& n2)
: n1_(n1), n2_(n2), positions_(new v3[n1 * n2])
{
if (n1_ < 1 || n2_ < 1)
{
throw std::invalid_argument("matrix_v3_ctor_invalid_sizes_error");
}
}
std::size_t n1() const
{
return n1_;
}
std::size_t n2() const
{
return n2_;
}
v3& position(const std::size_t& i1, const std::size_t& i2)
{
if (n1_ <= i1 || n2_ <= i2)
{
throw std::invalid_argument("matrix_v3_position_invalid_indices_error");
}
return positions_.get()[i1 * n2_ + i2];
}
const v3& position(const std::size_t& i1, const std::size_t& i2) const
{
return const_cast<matrix_v3*>(this)->position(i1, i2);
}
double* stream(const std::size_t& i1)
{
if (n1_ <= i1)
{
throw std::invalid_argument("matrix_v3_stream_invalid_index_error");
}
return reinterpret_cast<double*>(positions_.get() + i1 * n2_);
}
const double* stream(const std::size_t& i1) const
{
return const_cast<matrix_v3*>(this)->stream(i1);
}
};
//+//////////////////////////////////////////////
// functions
//+//////////////////////////////////////////////
double auto_vec_variant(const matrix_v3& mat, const double& k1, const double& k2)
{
// output
auto accumulator = double{};
// loop
const auto k2_b_2 = k2 * 0.5;
for (auto i1 = std::size_t{}; i1 < mat.n1() - 1; ++i1)
{
for (auto i2 = std::size_t{}; i2 < mat.n2(); ++i2)
{
auto lambda = mat.position(i1 + 1, i2) - mat.position(i1, i2) * k1;
accumulator += k2_b_2 * (lambda(0) * lambda(0) + lambda(1) * lambda(1) + lambda(2) * lambda(2));
}
}
return accumulator;
}
// 9 MiB L3 cache
#define VVI_LLD_CACHE_SIZE_MB 9
double intrinsics_variant(const matrix_v3& mat, const double& k1, const double& k2)
{
// inner loop constants
static constexpr auto unroll_count = 3;
static constexpr auto register_width = 4;
static constexpr auto increment = register_width * unroll_count;
// loop blocking details
constexpr auto lld_cache_size = VVI_LLD_CACHE_SIZE_MB * 1024 * 1024 / (sizeof(double) * 4); // 4 comes from actual application
const auto block_size = lld_cache_size / (mat.n1() - 1) / increment * increment; // must be divisible by increment!
const auto block_size_in_v3 = block_size / 3;
const auto rstream_size_div = mat.n2() * 3 / block_size * block_size;
// packed constants
const auto k2_b_2 = k2 * 0.5;
const auto k1_p = _mm256_broadcast_sd(&k1);
const auto k2_b_2_p = _mm256_broadcast_sd(&k2_b_2);
// output
auto accumulator = double{};
// let's go!
auto particle_index = std::size_t{};
for (auto start_index = std::size_t{}; start_index < rstream_size_div; start_index += block_size,
particle_index += block_size_in_v3)
{
for (auto i1 = std::size_t{}; i1 < mat.n1() - 1; ++i1)
{
// real valued streams
const auto stream1 = mat.stream(i1);
const auto stream2 = mat.stream(i1 + 1);
// loop
const auto end_index = start_index + block_size;
for (auto index = start_index; index < end_index; index += increment)
{
// load x(i) and x(i)_r
const auto x_0 = _mm256_loadu_pd(stream1 + index);
const auto x_r_0 = _mm256_loadu_pd(stream2 + index);
const auto x_1 = _mm256_loadu_pd(stream1 + index + 4);
const auto x_r_1 = _mm256_loadu_pd(stream2 + index + 4);
const auto x_2 = _mm256_loadu_pd(stream1 + index + 8);
const auto x_r_2 = _mm256_loadu_pd(stream2 + index + 8);
// subtraction and FMA (results in lambda)
const auto l0_0 = _mm256_fnmadd_pd(x_0, k1_p, x_r_0);
const auto l0_1 = _mm256_fnmadd_pd(x_1, k1_p, x_r_1);
const auto l0_2 = _mm256_fnmadd_pd(x_2, k1_p, x_r_2);
// shuffles x5
const auto l1_0 = _mm256_shuffle_pd(l0_0, l0_1, 0b00001011);
const auto l1_1 = _mm256_shuffle_pd(l0_1, l0_2, 0b00000010);
const auto l2_0 = _mm256_shuffle_pd(l0_0, l1_1, 0b00000110);
const auto y = _mm256_shuffle_pd(l1_0, l1_1, 0b00001100);
const auto l2_2 = _mm256_shuffle_pd(l1_0, l0_2, 0b00001001);
// rearrange (expensive ops)
const auto x = _mm256_permute4x64_pd(l2_0, 0b01111000); // 1,3,2,0
const auto z = _mm256_permute4x64_pd(l2_2, 0b11010010); // 3,1,0,2
// arithmetics
const auto xx = _mm256_mul_pd(x, x);
const auto xx_p_yy = _mm256_fmadd_pd(y, y, xx);
const auto dot_product = _mm256_fmadd_pd(z, z, xx_p_yy);
// accumulate (FMA)
auto pe = __m256d{};
pe = _mm256_fmadd_pd(dot_product, k2_b_2_p, pe);
// horizontal sum of packed dot product
const auto pe_high = _mm256_extractf128_pd(pe, 1);
const auto pe_low = _mm256_castpd256_pd128(pe);
const auto sum_128 = _mm_add_pd(pe_low, pe_high);
const auto sum_128_1 = _mm_shuffle_pd(sum_128, sum_128, 0b00000001); // 0, 1
const auto sum_64 = _mm_add_pd(sum_128, sum_128_1);
accumulator += _mm_cvtsd_f64(sum_64);
}
}
}
// outer remainder loop (no optimization necessary here)
for (auto i1 = std::size_t{}; i1 < mat.n1() - 1; ++i1)
{
for (auto i2 = particle_index; i2 < mat.n2(); ++i2)
{
auto lambda = mat.position(i1 + 1, i2) - mat.position(i1, i2) * k1;
accumulator += k2_b_2 * (lambda(0) * lambda(0) + lambda(1) * lambda(1) + lambda(2) * lambda(2));
}
}
return accumulator;
}
//+//////////////////////////////////////////////
// tester
//+//////////////////////////////////////////////
bool test_functionality()
{
const auto L = std::size_t{30};
const auto N = std::size_t{999460};
auto mat = matrix_v3(L, N);
// init
auto engine = std::mt19937_64(std::random_device()());
auto dist = std::uniform_real_distribution<double>(-5.0, 5.0);
for (auto i1 = std::size_t{}; i1 < mat.n1(); ++i1)
{
for (auto i2 = std::size_t{}; i2 < mat.n2(); ++i2)
{
auto& position = mat.position(i1, i2);
position(0) = std::floor(dist(engine));
position(1) = std::floor(dist(engine));
position(2) = std::floor(dist(engine));
}
}
// compute reference
auto accumulator_ref = auto_vec_variant(mat, 0.1, 0.4);
// compute test
auto accumulator_test = intrinsics_variant(mat, 0.1, 0.4);
// test
const auto error = std::fabs(accumulator_ref - accumulator_test);
if (error > 1E-15)
{
std::cout << std::fixed << std::setprecision(15) << "Test failed. Error: " << error << "." << std::endl;
return false;
}
return true;
}
int main()
{
// conduct the test
if (test_functionality())
{
return 0;
}
return 1;
}
I'm compiling the above with the GNU compiler v11.1.0 and flags "-O3;-mfma;-mavx2". The test fails with an error ~0.000006318092346. As can be seen in the code, the use of std::floor
ensures that only integer-valued floating point numbers are involved in the computation.
Questions:
- Why is there such a large difference in the results of the two variants?
- Which variant should be more accurate?
- If the intrinsics-variant is less accurate, how can its accuracy be improved?
Additional information:
Using simple incrementing, I counted how many additions are performed for the accumulation in each case. The naive variant performs 28984340 additions and the optimized one, 21738255 additions.
Thanks in advance, looking forward to hearing your thoughts.
Update:
Following optimization suggestions by @PeterCordes, I implemented a more efficient variant of the intrinsics-variant:
double intrinsics_variant(const matrix_v3& mat, const double& k1, const double& k2)
{
// inner loop constants
static constexpr auto unroll_count = 3;
static constexpr auto register_width = 4;
static constexpr auto increment = register_width * unroll_count;
// loop blocking details
constexpr auto lld_cache_size = VVI_LLD_CACHE_SIZE_MB * 1024 * 1024 / (sizeof(double) * 4); // 4 comes from actual application
const auto block_size = lld_cache_size / (mat.n1() - 1) / increment * increment; // must be divisible by increment!
const auto block_size_in_v3 = block_size / 3;
const auto rstream_size_div = mat.n2() * 3 / block_size * block_size;
// packed constants
const auto k2_b_2 = k2 * 0.5;
const auto k1_p = _mm256_broadcast_sd(&k1);
const auto k2_b_2_p = _mm256_broadcast_sd(&k2_b_2);
// packed accumulators
auto accumulator_p_0 = __m256d{};
auto accumulator_p_1 = __m256d{};
auto accumulator_p_2 = __m256d{};
// let's go!
auto particle_index = std::size_t{};
for (auto start_index = std::size_t{}; start_index < rstream_size_div; start_index += block_size,
particle_index += block_size_in_v3)
{
for (auto i1 = std::size_t{}; i1 < mat.n1() - 1; ++i1)
{
// real valued streams
const auto stream1 = mat.stream(i1);
const auto stream2 = mat.stream(i1 + 1);
// loop
const auto end_index = start_index + block_size;
for (auto index = start_index; index < end_index; index += increment)
{
// load x(i) and x(i)_r
const auto x_0 = _mm256_loadu_pd(stream1 + index);
const auto x_r_0 = _mm256_loadu_pd(stream2 + index);
const auto x_1 = _mm256_loadu_pd(stream1 + index + 4);
const auto x_r_1 = _mm256_loadu_pd(stream2 + index + 4);
const auto x_2 = _mm256_loadu_pd(stream1 + index + 8);
const auto x_r_2 = _mm256_loadu_pd(stream2 + index + 8);
// subtraction and FMA (results in lambda)
const auto lambda_0 = _mm256_fnmadd_pd(x_0, k1_p, x_r_0);
const auto lambda_1 = _mm256_fnmadd_pd(x_1, k1_p, x_r_1);
const auto lambda_2 = _mm256_fnmadd_pd(x_2, k1_p, x_r_2);
// dot product
const auto lambda_0_sq = _mm256_mul_pd(lambda_0, lambda_0);
const auto lambda_1_sq = _mm256_mul_pd(lambda_1, lambda_1);
const auto lambda_2_sq = _mm256_mul_pd(lambda_2, lambda_2);
// accumulate in packed register
accumulator_p_0 = _mm256_fmadd_pd(lambda_0_sq, k2_b_2_p, accumulator_p_0);
accumulator_p_1 = _mm256_fmadd_pd(lambda_1_sq, k2_b_2_p, accumulator_p_1);
accumulator_p_2 = _mm256_fmadd_pd(lambda_2_sq, k2_b_2_p, accumulator_p_2);
}
}
}
// sum up the unrolled accumulators
const auto accumulator_p_intermediate = _mm256_add_pd(accumulator_p_0, accumulator_p_1);
const auto accumulator_p = _mm256_add_pd(accumulator_p_intermediate, accumulator_p_2);
// horizontal sum of packed dot product
const auto high = _mm256_extractf128_pd(accumulator_p, 1);
const auto low = _mm256_castpd256_pd128(accumulator_p);
const auto sum_128 = _mm_add_pd(low, high);
const auto sum_128_1 = _mm_shuffle_pd(sum_128, sum_128, 0b00000001); // 0, 1
const auto sum_64 = _mm_add_pd(sum_128, sum_128_1);
auto accumulator = _mm_cvtsd_f64(sum_64);
// outer remainder loop (no optimization necessary here)
for (auto i1 = std::size_t{}; i1 < mat.n1() - 1; ++i1)
{
for (auto i2 = particle_index; i2 < mat.n2(); ++i2)
{
auto lambda = mat.position(i1 + 1, i2) - mat.position(i1, i2) * k1;
accumulator += k2_b_2 * (lambda(0) * lambda(0) + lambda(1) * lambda(1) + lambda(2) * lambda(2));
}
}
return accumulator;
}
The above consistently shows an even greater deviation of ~0.001716434955597.