8

I am given a array of lowercase characters (up to 1.5Gb) and a character c. And I want to find how many occurrences are of the character c using AVX instructions.

    unsigned long long char_count_AVX2(char * vector, int size, char c){
    unsigned long long sum =0;
    int i, j;
    const int con=3;
    __m256i ans[con];
    for(i=0; i<con; i++)
        ans[i]=_mm256_setzero_si256();

    __m256i Zer=_mm256_setzero_si256();
    __m256i C=_mm256_set1_epi8(c);
    __m256i Assos=_mm256_set1_epi8(0x01);
    __m256i FF=_mm256_set1_epi8(0xFF);
    __m256i shield=_mm256_set1_epi8(0xFF);
    __m256i temp;
    int couter=0;
    for(i=0; i<size; i+=32){
        couter++;
        shield=_mm256_xor_si256(_mm256_cmpeq_epi8(ans[0], Zer), FF);
        temp=_mm256_cmpeq_epi8(C, *((__m256i*)(vector+i)));
        temp=_mm256_xor_si256(temp, FF);
        temp=_mm256_add_epi8(temp, Assos);
        ans[0]=_mm256_add_epi8(temp, ans[0]);
        for(j=1; j<con; j++){
            temp=_mm256_cmpeq_epi8(ans[j-1], Zer);
            shield=_mm256_and_si256(shield, temp);
            temp=_mm256_xor_si256(shield, FF);
            temp=_mm256_add_epi8(temp, Assos);
            ans[j]=_mm256_add_epi8(temp, ans[j]);
        }
    }
    for(j=con-1; j>=0; j--){
        sum<<=8;
        unsigned char *ptr = (unsigned char*)&(ans[j]);
        for(i=0; i<32; i++){
            sum+=*(ptr+i);
        }
    }
    return sum;
}
Peter Cordes
  • 328,167
  • 45
  • 605
  • 847
Adamos2468
  • 151
  • 1
  • 9
  • What is your character format? ASCII or some kind of Unicode? – zx485 Feb 05 '19 at 18:54
  • The format is ASCII – Adamos2468 Feb 05 '19 at 18:59
  • 2
    AVX1 or AVX2? What have you tried? Hint: Check `_mm256_cmpeq_epi8` and `_mm256_sub_epi8` for the most inner loop. After 255 iterations you need to start combining two bytes into one `uint16`, and so on – chtz Feb 05 '19 at 19:01
  • AVX2, well until now I have an array of 4 __m256i variables and I am pushing the overflow from index 0 until 3. – Adamos2468 Feb 05 '19 at 19:05
  • Can you show some code of what you did? For adding 8bit integers to larger integers, what this question: https://stackoverflow.com/questions/54541127/ – chtz Feb 05 '19 at 19:32
  • 1
    `_mm256_cmpeq_epi8` will get you a `-1` in each byte. If you subtract that from a counter (using `_mm256_sub_epi8`) you can directly count up to 255 or 128, i.e., your most inner loop should just contain these two intrinsics. – chtz Feb 05 '19 at 19:49
  • @Adamos2468: I added efficient horizontal-sum code to chtz's answer, using `vpsadbw` against zero for byte to qword. And efficient shuffles for 256-bit down to 64-bit like [Fastest way to do horizontal float vector sum on x86](//stackoverflow.com/q/6996764). – Peter Cordes Feb 06 '19 at 05:44
  • 2
    One core can't usually saturate DRAM bandwidth, so for *large* inputs it might be worth using multiple threads (especially if you already have a worker thread started and can just send it a function pointer and args). You tagged this [tag:parallel-processing], are you asking for OpenMP or something as well? – Peter Cordes Feb 06 '19 at 05:47

3 Answers3

4

I'm intentionally leaving out some parts, which you need to figure out yourself (e.g. handling lengths that aren't a multiple of 4*255*32 bytes), but your most inner loop should look something like the one starting with for(int i...):

_mm256_cmpeq_epi8 will get you a -1 in each byte, which you can use as an integer. If you subtract that from a counter (using _mm256_sub_epi8) you can directly count up to 255 or 128. The inner loop contains just these two intrinsics. You have to stop and

#include <immintrin.h>
#include <stdint.h>

static inline
__m256i hsum_epu8_epu64(__m256i v) {
    return _mm256_sad_epu8(v, _mm256_setzero_si256());  // SAD against zero is a handy trick
}

static inline
uint64_t hsum_epu64_scalar(__m256i v) {
    __m128i lo = _mm256_castsi256_si128(v);
    __m128i hi = _mm256_extracti128_si256(v, 1);
    __m128i sum2x64 = _mm_add_epi64(lo, hi);   // narrow to 128

    hi = _mm_unpackhi_epi64(sum2x64, sum2x64);
    __m128i sum = _mm_add_epi64(hi, sum2x64);  // narrow to 64
    return _mm_cvtsi128_si64(sum);
}


unsigned long long char_count_AVX2(char const* vector, size_t size, char c)
{
    __m256i C=_mm256_set1_epi8(c);

    // todo: count elements and increment `vector` until it is aligned to 256bits (=32 bytes)
    __m256i const * simd_vector = (__m256i const *) vector;
     // *simd_vector is an alignment-required load, unlike _mm256_loadu_si256()

    __m256i sum64 = _mm256_setzero_si256();
    size_t unrolled_size_limit = size - 4*255*32 + 1;
    for(size_t k=0; k<unrolled_size_limit ; k+=4*255*32) // outer loop: TODO
    {
        __m256i counter[4]; // multiple counter registers to hide latencies
        for(int j=0; j<4; j++)
            counter[j]=_mm256_setzero_si256();
        // inner loop: make sure that you don't go beyond the data you can read
        for(int i=0; i<255; ++i)
        {   // or limit this inner loop to ~22 to avoid branch mispredicts
            for(int j=0; j<4; ++j)
            {
                counter[j]=_mm256_sub_epi8(counter[j],           // count -= 0 or -1
                                           _mm256_cmpeq_epi8(*simd_vector, C));
                ++simd_vector;
            }
        }

        // only need one outer accumulator: OoO exec hides the latency of adding into it
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[0]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[1]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[2]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[3]));
    }

    uint64_t sum = hsum_epu64_scalar(sum64);

    // TODO add up remaining bytes with sum.
    // Including a rolled-up vector loop before going scalar
    //  because we're potentially a *long* way from the end

    // Maybe put some logic into the main loop to shorten the 255 inner iterations
    // if we're close to the end.  A little bit of scalar work there shouldn't hurt every 255 iters.

    return sum;
}

Godbolt link: https://godbolt.org/z/do5e3- (clang is slightly better than gcc at unrolling the most inner loop: gcc includes some useless vmovdqa instructions that will bottleneck the front-end if the data is hot in L1d cache, preventing us from running close to 2x 32-byte loads per clock)

chtz
  • 17,329
  • 4
  • 26
  • 56
  • 1
    The widening to epu64 can and should be done with `_mm256_sad_epu8(counter, _mm256_setzero_si256())`, then `_mm256_add_epi64` into one vector which you hsum at the very end. – Peter Cordes Feb 06 '19 at 04:37
  • 1
    I added code that does that hsum, and an outer loop size limit. Note that clang uses indexed addressing modes, so it's no closer than gcc to running at 2 loads per clock on Haswell/Skylake. :( They'll unlaminate from `vpcmpeqb` into separate uops in the issue stage. Writing it with the loop bounds as pointer-comparison might be a better bet, and get clang to just do pure pointer increments instead of silly indexing. e.g. `const char *endp = min(buf + size, buf + 4*255*32)` or something. – Peter Cordes Feb 06 '19 at 05:41
  • Thanks @PeterCordes for improving this! I guess for gcc it would be better to manually unroll the inner loop (i.e., create 4 variables instead of an array). Nice trick with `vpsadbw`. – chtz Feb 06 '19 at 07:31
  • 1
    Yeah, that might help GCC avoid the stupid `vmovdqa` instructions. Worth trying if you're curious. Or file a missed-optimization bug; it's already optimized away any store/reload of the array of 4 vectors, and this is clearly something that it *should* be able to optimize. Anyway, gcc's `-funroll-loops` is only enabled manually or as part of `-fprofile-use`; for large codebases unrolling every loop hurts more than it helps, but profile-use will identify the hot loops and unroll them. I'd assume unrolling would let it avoid extra movdqa as well. – Peter Cordes Feb 06 '19 at 07:35
  • The `vpsadbw` trick is relatively well known for hsumming 8-bit data, and worth using even for signed by XORing to range-shift and then subtracting the `16 or 32 * 128` bias at the end. I think Agner Fog's optimization guide mentions it, or at least his VectorClass library uses it. – Peter Cordes Feb 06 '19 at 07:37
3

If you don't insist on using only SIMD instructions, you can make use
of the VPMOVMSKB instruction in combination with the POPCNT instruction. The former combines the highest bits of each byte into a 32-bit integer mask and the latter counts the 1 bits in this integer (=the count of char matches).

int couter=0;
for(i=0; i<size; i+=32) {
  ...
  couter += 
    _mm_popcnt_u32( 
      (unsigned int)_mm256_movemask_epi8( 
        _mm256_cmpeq_epi8( C, *((__m256i*)(vector+i) ))
      ) 
    );
  ...
}    

I haven't tested this solution, but you should get the gist.

zx485
  • 28,498
  • 28
  • 50
  • 59
  • I had the same idea in OP's other, now-deleted question. GMTA. – Shawn Feb 05 '19 at 20:15
  • 1
    Doing `_mm256_movemask_epi8` and `_mm_popcnt_u32` in the inner loop is far less efficient than `_mm256_sub_epi8` – chtz Feb 05 '19 at 20:33
  • I guess so. But it is an alternative that is worth mentioning for its simplicity. – zx485 Feb 05 '19 at 20:41
  • 1
    Possibly useful as part of a cleanup loop, or **for an unaligned start/end where you shift out some of the bits before you popcnt, using a shift count calculated from the overlap**. Otherwise the more reasonable "simple" version is putting the `psadbw` epu8->epu64 hsum inside the inner loop and using `_mm256_add_epi64`. That's only 1 extra instruction per vector vs. the efficient way, vs. 2 (`vpcmpeqb` + `vpmovmskb` + `popcnt` + `add` vs. `vpcmpeqb` (+`vpsadbw`) + `vpsubb` / `q`). – Peter Cordes Feb 06 '19 at 05:35
3

Probably the fastest: memcount_avx2 and memcount_sse2

size_t memcount_avx2(const void *s, int c, size_t n) 
{    
  __m256i cv = _mm256_set1_epi8(c), 
          zv = _mm256_setzero_si256(), 
         sum = zv, acr0,acr1,acr2,acr3;
  const char *p,*pe;    

  for(p = s; p != (char *)s+(n- (n % (252*32)));) 
  { 
    for(acr0 = acr1 = acr2 = acr3 = zv, pe = p+252*32; p != pe; p += 128) 
    {
      acr0 = _mm256_sub_epi8(acr0, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)p))); 
      acr1 = _mm256_sub_epi8(acr1, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+32)))); 
      acr2 = _mm256_sub_epi8(acr2, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+64)))); 
      acr3 = _mm256_sub_epi8(acr3, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+96)))); 
      __builtin_prefetch(p+1024);
    }
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr0, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr1, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr2, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr3, zv));
  } 

  for(acr0 = zv; p+32 < (char *)s + n; p += 32)  
    acr0 = _mm256_sub_epi8(acr0, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)p))); 
  sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr0, zv));

  size_t count = _mm256_extract_epi64(sum, 0) 
               + _mm256_extract_epi64(sum, 1) 
               + _mm256_extract_epi64(sum, 2) 
               + _mm256_extract_epi64(sum, 3);  

  while(p != (char *)s + n) 
      count += *p++ == c;
  return count;
}

Benchmark skylake i7-6700 - 3.4GHz - gcc 8.3:

memcount_avx2 : 28 GB/s
memcount_sse: 23 GB/s
char_count_AVX2 : 23 GB/s (from post)

powturbo
  • 311
  • 3
  • 3
  • You can use `_mm256_sub_epi8` to accumulate the cmpeq results instead of wasting instructions in the outer loop. Also, this code is too compact for its own good and not indented properly. (Maybe a tab vs. space problem in the SO markdown?) It took me a while to find where you were zeroing `acr0..3` between inner-loop iterations; would make more sense to declare them *inside* the outer loop. I don't think there are any compilers that support AVX2 but not C99. I'd also do the end-pointer calculations on separate source lines. – Peter Cordes Sep 17 '19 at 03:59
  • I don't see what you mean, by wasting instructions – powturbo Sep 17 '19 at 06:01
  • 1
    If you use `acr0 = _mm256_sub_epi8(acr0, cmp(...))` then the outer loop can just use `acr0` instead of `_mm256_sub_epi8(zv, acr0)`. Use `x -= -1` instead of `x += -1` inside the inner loop. That `sub` is a wasted instruction in your version. – Peter Cordes Sep 17 '19 at 06:17
  • 1
    Thanks Peter, I've made the changes and a new benchmark – powturbo Sep 17 '19 at 07:56