2

I'd like to implement a fast correlation coefficient computation using SSE/AVX2. The operands are two unsigned char vectors. The function should be mostly equivalent to this:

float correlate_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum1 = 0;
    int sum2 = 0;
    int sum11 = 0;
    int sum22 = 0;
    int sum12 = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2) {
        sum1 += *vec1;
        sum2 += *vec2;
        sum11 += *vec1  * *vec1;
        sum22 += *vec2  * *vec2;
        sum12 += *vec1  * *vec2;
    }
    double mean1 = double(sum1) / double(length);
    double mean2 = double(sum2) / double(length);
    double mean11 = double(sum11) / double(length);
    double mean22 = double(sum22) / double(length);
    double mean12 = double(sum12) / double(length);

    double b = (mean11 - mean1 * mean1) * (mean22 - mean2 * mean2);
    if (b <= 0.0)
        return 0.0f;
    double a = (mean12 - mean1 * mean2);

    return float(a / sqrt(b));
}

The parameter length ranges from 1 to less than 1000.

In order to do this, I researched on how to implement an inner product of two unsigned byte arrays. However, I could not come up with a solution that does not involve converting all the unsigned 8 bit values to signed 16 bit values.

The intrinsic _mm256_maddubs_epi16(a, b) expects b to be a signed byte. This would not be a problem in this case since subtracting some constant (here: 127) from b does not change the correlation coefficient. Unfortunately I could not find an intrinsic that would allow me to subtract 127 from unsigned bytes producing signed bytes (not relying on some two's complement magic).

// vec: const unsigned char*
auto x = _mm256_load_si256((const __m256i*) vec);
auto v = _mm256_set1_epi8(127);

// wrong if vec[i] is less than 127:
auto x_centered = _mm256_sub_epi8 (x, v);

What would be the best approach here to compute inner products (and finally a correlation coefficient)?

Addendum:

Below is my current implementation of a pure inner product. I decided to convert to 16 bit integer to avoid overflow errors. Update: Changed from reading 128 bits to 256 bits at a time.

int accumulate_i32(__m256i x)
{
    auto tmp1 = _mm256_srli_si256(x, 8);
    x = _mm256_add_epi32(x, tmp1);
    auto tmp2 = _mm256_extractf128_si256(x, 1);
    tmp2 = _mm_add_epi32(tmp2, _mm256_castsi256_si128(x));
    
    return _mm_cvtsi128_si32(tmp2) + _mm_extract_epi32(tmp2, 1);
}

int inner_product_avx(const unsigned char* vec1, const unsigned char* vec2, unsigned int length)
{    
    constexpr unsigned int memoryAlignmentBytes = 32;
    constexpr unsigned int bytesPerPack = 256 / 8;

    assert((reinterpret_cast<std::uintptr_t>(vec1) % memoryAlignmentBytes) == 0);
    assert((reinterpret_cast<std::uintptr_t>(vec2) % memoryAlignmentBytes) == 0);    
    

    // compute middle part via AVX2    
    unsigned int packCount = length / bytesPerPack;
    const __m256i zeros = _mm256_setzero_si256();
    auto sumlo = _mm256_setzero_si256();
    auto sumhi = _mm256_setzero_si256();

    for (unsigned int packIdx = 0; packIdx < packCount; ++packIdx) {
        auto x1 = _mm256_load_si256((const __m256i*)vec1);
        auto x2 = _mm256_load_si256((const __m256i*)vec2);

        auto x1lo = _mm256_unpacklo_epi8(x1, zeros);
        auto x1hi = _mm256_unpackhi_epi8(x1, zeros);
        auto x2lo = _mm256_unpacklo_epi8(x2, zeros);
        auto x2hi = _mm256_unpackhi_epi8(x2, zeros);
        
        auto tmplo = _mm256_madd_epi16(x1lo, x2lo);
        auto tmphi = _mm256_madd_epi16(x1hi, x2hi);

        sumlo = _mm256_add_epi32(sumlo, tmplo);
        sumhi = _mm256_add_epi32(sumhi, tmphi);

        vec1 += bytesPerPack;
        vec2 += bytesPerPack;
    }

    int sum = accumulate_i32(sumlo) + accumulate_i32(sumhi);

    
    // compute remaining part that cannot be represented as a 
    // whole packed integer    
    unsigned int packRestCount = length % bytesPerPack;
    for (size_t i = packRestCount; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    
    return sum;
}

This takes roughly 20 % of the time of the simple C++ implementation (see below). Considering the fact that the AVX code works on 16 16-bit integers simultaneously, I would have expected a higher gain. - Is this reasonable or did I miss something?

Unrolling the last loop in the AVX code did not descrease computation time.

int inner_product_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    return sum;
}
cataclysm
  • 23
  • 4
  • What is the typical/maximum value of `length` ? – Paul R Oct 21 '20 at 16:10
  • 2
    You need to subtract 128 (which can be done by xor-ing with `0x80`) from e.g., each `vec1` and at the end add `128*sum2` (when calculating `sum12`). – chtz Oct 21 '20 at 16:25
  • 2
    Also, be aware that `_mm256_maddubs_epi16` saturates. So converting to 16 bit might actually be a simpler solution. – chtz Oct 21 '20 at 16:33
  • 1
    Shouldn't `if (b == 0.0)` actually be `if (b <= 0.0)`, to account for possible rounding error making a result negative, and thus also invalid for sqrt? – Peter Cordes Oct 22 '20 at 00:01
  • `length` ranges from 1 to less than 1000. I added this to my question. Additionally, I changed to code to check for `b` is equal to or less than 0.0. – cataclysm Oct 22 '20 at 06:04

2 Answers2

2

Unfortunately I could not find an intrinsic that would allow me to subtract 127 from unsigned bytes producing signed bytes (not relying on some two's complement magic).

Intel x86 CPUs use 2's compliment representation of signed numbers, this is why there are no separate versions of SIMD intrinsics for signed/unsigned packed integers. Intel SIMD intrinsics are outside the scope of C++ standard and have a specific well-defined behaviour.

Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
2

I would start from something like that. It uses 32-bit accumulators just like your current code. Untested.

namespace
{
    // Compute sum of the 64-bit lanes, convert to double
    inline double hadd_epi64( const __m256i i64 )
    {
        __m128i res = _mm256_castsi256_si128( i64 );
        res = _mm_add_epi64( res, _mm256_extractf128_si256( i64, 1 ) );
        res = _mm_add_epi64( res, _mm_unpackhi_epi64( res, res ) );
        return (double)_mm_cvtsi128_si64( res );
    }

    // Convert 32-bit lanes into 64-bit, compute sum of the 8, convert to double
    inline double hadd_epu32( __m256i x )
    {
        const __m256i zero = _mm256_setzero_si256();
        __m256i i64 = _mm256_unpacklo_epi32( x, zero );
        i64 = _mm256_add_epi64( i64, _mm256_unpackhi_epi32( x, zero ) );
        return hadd_epi64( i64 );
    };
}

class InnerProduct
{
    // These fields are interpreted as 64-bit integers
    __m256i a, b;
    // These fields are interpreted as 32-bit integers
    __m256i aa, bb, ab;

    // Accumulate products of 16-bit numbers, 16 of them at once
    inline void add16( __m256i x, __m256i y )
    {
        const __m256i x2 = _mm256_madd_epi16( x, x );
        const __m256i y2 = _mm256_madd_epi16( y, y );
        const __m256i prod = _mm256_madd_epi16( x, y );

        aa = _mm256_add_epi32( aa, x2 );
        bb = _mm256_add_epi32( bb, y2 );
        ab = _mm256_add_epi32( ab, prod );
    }

public:

    InnerProduct()
    {
        a = b = aa = bb = ab = _mm256_setzero_si256();
    }

    // Handle 32 bytes
    inline void addBytes( __m256i x, __m256i y )
    {
        // Accumulate values
        const __m256i zero = _mm256_setzero_si256();
        a = _mm256_add_epi64( a, _mm256_sad_epu8( x, zero ) );
        b = _mm256_add_epi64( b, _mm256_sad_epu8( y, zero ) );

        // Split the vectors into 2 sets of 16-bit numbers, accumulate products
        const __m256i z = _mm256_unpacklo_epi8( x, zero );
        const __m256i w = _mm256_unpacklo_epi8( y, zero );
        add16( z, w );

        x = _mm256_unpackhi_epi8( x, zero );
        y = _mm256_unpackhi_epi8( y, zero );
        add16( x, y );
    }

    // Compute the result
    float compute( size_t count ) const
    {
        const double div = (double)count;
        const double mean1 = hadd_epi64( a ) / div;
        const double mean2 = hadd_epi64( b ) / div;
        const double mean11 = hadd_epu32( aa ) / div;
        const double mean22 = hadd_epu32( bb ) / div;
        const double mean12 = hadd_epu32( ab ) / div;

        const double b = ( mean11 - mean1 * mean1 ) * ( mean22 - mean2 * mean2 );
        if( b <= 0 )
            return 0;
        const double a = ( mean12 - mean1 * mean2 );
        return float( a / sqrt( b ) );
    }
};

// Load 1-31 bytes into AVX register, zero out unused higher bytes
inline __m256i loadPartial( const uint8_t* p, size_t length )
{
    alignas( 32 ) std::array<uint8_t, 32> arr{};
    memcpy( arr.data(), p, length );
    return _mm256_load_si256( ( const __m256i* )arr.data() );
}

float correlate_simple( const uint8_t* vec1, const uint8_t* vec2, const size_t length )
{
    InnerProduct ip;
    const uint8_t* const vec1End = vec1 + ( ( length / 32 ) * 32 );
    for( ; vec1 < vec1End; vec1 += 32, vec2 += 32 )
    {
        const __m256i x = _mm256_loadu_si256( ( const __m256i * )vec1 );
        const __m256i y = _mm256_loadu_si256( ( const __m256i * )vec2 );
        ip.addBytes( x, y );
    }
    const size_t remainder = length % 32;
    if( remainder > 0 )
    {
        const __m256i x = loadPartial( vec1, remainder );
        const __m256i y = loadPartial( vec2, remainder );
        ip.addBytes( x, y );
    }
    return ip.compute( length );
}
Soonts
  • 20,079
  • 9
  • 57
  • 130
  • Why would you need `_mm256_mullo_epi32`? The original inputs are only 8-bit wide, huge waste to widen them all the way to 32-bit for a slow `pmulld` (2 uops for the SIMD-multiply port(s) on Intel). `pmaddwd` is faster (single-uop) and takes 16-bit inputs. Zero-extending your 8-bit inputs can give you (positive) 16-bit signed inputs for https://www.felixcloutier.com/x86/pmaddwd – Peter Cordes Oct 22 '20 at 05:29
  • @PeterCordes The accumulators need to be at 32 bit or integers will overflow. `pmulld` is a single µop on modern processors (Zen2, Intel doesn’t have any). Updated for lulz, but I think the measured performance on modern CPUs gonna be pretty close due to RAM bottleneck in most cases. – Soonts Oct 22 '20 at 16:33
  • Yeah, of course you need 32-bit accumulators eventually, that's why you should use `pmaddwd` to multiply and horizontal-add, widening from 16 to 32-bit in the process, getting twice as many element multiplies done per instruction, and saving unpacking / adding steps. Just considering multiply throughput as a back of the envelope estimate, that's about 2x as fast on Zen 2, and 4x as fast on Haswell and later. – Peter Cordes Oct 22 '20 at 20:29
  • There is a large installed base of Haswell and later CPUs that most code normally cares about running well on. And I'd consider Ice Lake / Tiger Lake cores competitive with Zen 2. (They're only available in laptops, so Intel is not currently a great option for anyone who wants to buy anything else.) My standards for "modern" are not that restrictive. It depends on context, but usually I'd include anything in the same microarchitecture family, like Sandybridge-family and Zen-family. Or for this, the relevant cutoff is having AVX2, and perhaps having full-width 256-bit vector mem/ALU. – Peter Cordes Oct 22 '20 at 20:42
  • Re: memory bottleneck argument: that's a lame excuse for inefficient code. Caches work, and are much faster than RAM, especially L1d cache. For the OP's use case: "The parameter length ranges from 1 to less than 1000." so yes, we really are talking about arrays with max size of under 1kiB, and it's very reasonable for them to be hot in L1d cache if the caller has good memory locality. – Peter Cordes Oct 22 '20 at 20:44
  • BTW, the non-multiply hsum from 16->32 can also be done with `pmaddwd` against a vector of `set1_epi16(1)`. That might bottleneck on the execution ports for pmaddwd since we're already using it in the loop, but it's worth considering accumulating at least one of `a` or `b` that way, with shift / mask / add / add for only one of them. But even better, **the `a` and `b` vectors were *originally* unsigned bytes, so we can hsum them with `psadbw` against zero before unpacking.** – Peter Cordes Oct 22 '20 at 20:49
  • Nice! Your `hadd_epi64` could be more efficient by taking advantage of the fact the integers are < 2^31. (Which is definitely true for the sum of a and b, and also for the sum of products for lengths less than 2^15). For short vectors, minimizing this overhead might be relevant. Use packed `_mm_cvtepi32_pd` conversion instead of extract to scalar int64 and then scalar to double. (And the `_mm_cvtsd_f64` from `__m128d` to `double` is free). There is no packed int64->double until AVX512, so compilers couldn't optimize away the `movq` + `cvtsi2sd`. – Peter Cordes Oct 23 '20 at 00:17
  • While you're at it, remember that Zen 1 only has 128-bit vector width, so narrowing to 128-bit as the first step is best, if you're not worried about overflowing a uint32 or even int32. (Your way of unpacking with zeros does extend the max size you can handle without overflow by a factor of 4 when you consider signedness. Maybe not worth branching on length in the cleanup, but if there is a known max like the OP's 1000, always using the short version is probably good, especially given that we're doing it multiple times.) [SSE/AVX hsums canonical answer](https://stackoverflow.com/a/35270026) – Peter Cordes Oct 23 '20 at 00:21
  • Thanks, for this nice complete solution. I still wonder why there is no direct way to accumulate all 32 bit integers in an m256i array. – cataclysm Oct 23 '20 at 09:53
  • A question: loadPartial() shall zero out unused bytes. How is this done? `arr` is not automatically filled with zeros, right? – cataclysm Oct 23 '20 at 09:56
  • @cataclysm It’s called value initialization, note the `{}`: https://stackoverflow.com/a/18304935/126995 – Soonts Oct 23 '20 at 10:37
  • @cataclysm About accumulators, it’s possible to use less of them but it’s gonna be slower. In every iteration of the `for`, you’d need many extra instructions to compute horizontal sums and pack values together. It’s often more efficient to postpone these horizontal operations till the very end. CPUs have 16 vector registers, enough for the 5 accumulators and all these temporary vectors. If you look at disassembly of release build, the class should be missing from there, these 5 accumulators should become 5 of these ymm0..ymm15 vector registers. – Soonts Oct 23 '20 at 10:51
  • @Soonts: Sorry for the misunderstanding, I actually meant the sum of the packed integers. There seems to be no instruction to just sum over a final __m256i containing 32 bit values. – cataclysm Oct 23 '20 at 14:02
  • I updated my own code (see above) to load 256 bit values (instead of 128 bit values) and afterwards splitting it into 128 values as shown in the code of this answer. Thank you again! – cataclysm Oct 23 '20 at 14:04
  • @cataclysm “There seems to be no instruction to just sum over a final __m256i containing 32 bit values” `phaddd` is the closest one, but I think what I did instead is more efficient. These hadd instructions are relatively slow even on modern CPUs, several µops. – Soonts Oct 23 '20 at 20:16
  • @cataclysm If you gonna call that a lot, for relatively small vectors, what Peter suggested is a good idea, i.e. use `_mm256_cvtepi32_pd` in the `compute` method. – Soonts Oct 23 '20 at 21:15