Here’s AVX2 version in C++/17, untested.
#include <immintrin.h>
#include <stdint.h>
#include <array>
#include <assert.h>
using Accumulators = std::array<__m256d, 4>;
// Conditionally accumulate 4 lanes, with source data from a register
template<int i>
inline void conditionalSum4( Accumulators& acc, __m256d vals, __m256i broadcasted )
{
// Unless the first register in the batch, shift mask values by multiples of 4 bits
if constexpr( i > 0 )
broadcasted = _mm256_srli_epi64( broadcasted, i * 4 );
// Bits 0-3 in 64-bit lanes of `broadcasted` correspond to the values being processed
// Compute mask from the lowest 4 bits of `broadcasted`, each lane uses different bit
const __m256i bits = _mm256_setr_epi64x( 1, 2, 4, 8 );
__m256i mask = _mm256_and_si256( broadcasted, bits );
mask = _mm256_cmpeq_epi64( mask, bits ); // Expand bits into qword-s
// Bitwise AND to zero out masked out lanes: integer zero == double 0.0
// BTW, if your mask has 1 for values you want to ignore, _mm256_andnot_pd
vals = _mm256_and_pd( vals, _mm256_castsi256_pd( mask ) );
// Accumulate the result
acc[ i ] = _mm256_add_pd( acc[ i ], vals );
}
// Conditionally accumulate 4 lanes, with source data from memory
template<int i>
inline void conditionalSum4( Accumulators& acc, const double* source, __m256i broadcasted )
{
constexpr int offset = i * 4;
const __m256d vals = _mm256_loadu_pd( source + offset );
conditionalSum4<i>( acc, vals, broadcasted );
}
// Conditionally accumulate lanes from memory, for the last potentially incomplete vector
template<int i>
inline void conditionalSumPartial( Accumulators& acc, const double* source, __m256i broadcasted, size_t count )
{
constexpr int offset = i * 4;
__m256d vals;
__m128d low, high;
switch( count - offset )
{
case 1:
// Load a scalar, zero out other 3 lanes
vals = _mm256_setr_pd( source[ offset ], 0, 0, 0 );
break;
case 2:
// Load 2 lanes
low = _mm_loadu_pd( source + offset );
high = _mm_setzero_pd();
vals = _mm256_setr_m128d( low, high );
break;
case 3:
// Load 3 lanes
low = _mm_loadu_pd( source + offset );
high = _mm_load_sd( source + offset + 2 );
vals = _mm256_setr_m128d( low, high );
break;
case 4:
// The count of values was multiple of 4, load the complete vector
vals = _mm256_loadu_pd( source + offset );
break;
default:
assert( false );
return;
}
conditionalSum4<i>( acc, vals, broadcasted );
}
// The main function implementing the algorithm.
// maskBytes argument is densely packed mask values with 1 bit per double, the size must be ( ( count + 7 ) / 8 )
// Each byte of the mask packs 8 boolean values, the first value of the byte is in the least significant bit.
double conditionalSum( const double* source, const uint8_t* maskBytes, size_t count )
{
// Zero-initialize all 4 accumulators
std::array<__m256d, 4> acc;
acc[ 0 ] = acc[ 1 ] = acc[ 2 ] = acc[ 3 ] = _mm256_setzero_pd();
// The main loop consumes 16 scalars, and 16 bits of the mask, per iteration
for( ; count >= 16; source += 16, maskBytes += 2, count -= 16 )
{
// Broadcast 16 bits of the mask from memory into AVX register
// Technically, C++ standard says casting pointers like that is undefined behaviour.
// Works fine in practice; alternatives exist, but they compile into more than 1 instruction.
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSum4<2>( acc, source, broadcasted );
conditionalSum4<3>( acc, source, broadcasted );
}
// Now the hard part, dealing with the remainder
// The switch argument is count of vectors in the remainder, including incomplete ones.
switch( ( count + 3 ) / 4 )
{
case 0:
// Best case performance wise, the count of values was multiple of 16
break;
case 1:
{
// Note we loading a single byte from the mask instead of 2 bytes. Same for case 2.
const __m256i broadcasted = _mm256_set1_epi8( (char)*maskBytes );
conditionalSumPartial<0>( acc, source, broadcasted, count );
}
case 2:
{
const __m256i broadcasted = _mm256_set1_epi8( (char)*maskBytes );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSumPartial<1>( acc, source, broadcasted, count );
break;
}
case 3:
{
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSumPartial<2>( acc, source, broadcasted, count );
break;
}
case 4:
{
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSum4<2>( acc, source, broadcasted );
conditionalSumPartial<3>( acc, source, broadcasted, count );
break;
}
}
// At last, compute sum of the 16 accumulated values
const __m256d r01 = _mm256_add_pd( acc[ 0 ], acc[ 1 ] );
const __m256d r23 = _mm256_add_pd( acc[ 2 ], acc[ 3 ] );
const __m256d sum4 = _mm256_add_pd( r01, r23 );
const __m128d sum2 = _mm_add_pd( _mm256_castpd256_pd128( sum4 ), _mm256_extractf128_pd( sum4, 1 ) );
const __m128d sum1 = _mm_add_sd( sum2, _mm_shuffle_pd( sum2, sum2, 0b11 ) );
return _mm_cvtsd_f64( sum1 );
}
Couple interesting points.
I unrolled the loop by 16 values, and using 4 independent accumulators. Increases bandwidth because pipelining. Makes loop exit checks less often i.e. more instructions are spent computing useful stuff. Reduces frequency of broadcasting the mask values, moving data from scalar to vector units had some latency. Note I only load from the mask once per 16 elements, and reusing the vector by bit shifting. Also increases precision, when you add small float value to large float value the precision is lost, 16 scalar accumulators help.
Correctly handling these remainders, without moving data from registers to memory and back, is complicated, need partial loads and such.
If you move the integer value from template argument into normal integer arguments the code will likely stop compiling, the compiler will say something like “expected constant expression”. The reason for that, many SIMD instructions, including _mm256_srli_epi64
, encode constants as parts of instruction, so the compiler needs to know these values. Another thing, array index needs to be constexpr or the compiler will evict array from 4 registers into RAM, to be able to do pointer math when you access the values. Accumulators need to stay in registers all the time or the performance will decrease by a huge factor, even L1D cache is much slower than registers.
Here’s the output of gcc. The assembly seems reasonable, the compiler has successfully inlined everything, and the only memory access in the main loop is for the source values. The main loop is below the .L3
label there.