1

test_euclid_ask.h (only need to read 2 functions: euclid_slow, euclid_fast)

#pragma once
#include "included.h"

double
euclid_slow(int n, double* data1, double* data2, int* mask1, int* mask2, const double weight[])
{
    double result = 0.0;
    double totalWeight = 0;    
   
    for (int i = 0; i < n; i++) {
        if (mask1[i] && mask2[i]) {
            double term = data1[i] - data2[i];
            result += weight[i] * term * term;
            totalWeight += weight[i];
        }
    }
        
    if (totalWeight==0) return 0; 
    return result / totalWeight;
}

double
euclid_fast(int n, double* data1, double* data2, int* mask1, int* mask2, const double weight[])
{
    double result = 0.0;
    double totalWeight = 0;
    double subResult[4] = { 0. };
    double subTweight[4] = { 0. };
    double subDiff[4] = { 0. };
    double subWeight[4] = { 0. };
    double subMask[4] = { 0. };
    int nstep4 = n - n % 4;

    for (int i = 0; i < nstep4; i += 4) {
        subMask[0] = mask1[i] && mask2[i];
        subMask[1] = mask1[i + 1] && mask2[i + 1];
        subMask[2] = mask1[i + 2] && mask2[i + 2];
        subMask[3] = mask1[i + 3] && mask2[i + 3];
        if (!(subMask[0] || subMask[1] || subMask[2] || subMask[3])) continue;            

        subDiff[0] = data1[i] - data2[i];
        subDiff[1] = data1[i + 1] - data2[i + 1];
        subDiff[2] = data1[i + 2] - data2[i + 2];
        subDiff[3] = data1[i + 3] - data2[i + 3];

        subDiff[0] *= subDiff[0];
        subDiff[1] *= subDiff[1];
        subDiff[2] *= subDiff[2];
        subDiff[3] *= subDiff[3];

        subWeight[0] = weight[i] * subMask[0];
        subWeight[1] = weight[i + 1] * subMask[1];
        subWeight[2] = weight[i + 2] * subMask[2];
        subWeight[3] = weight[i + 3] * subMask[3];

        subTweight[0] += subWeight[0];
        subTweight[1] += subWeight[1];
        subTweight[2] += subWeight[2];
        subTweight[3] += subWeight[3];

        subResult[0] += subWeight[0] * subDiff[0];
        subResult[1] += subWeight[1] * subDiff[1];
        subResult[2] += subWeight[2] * subDiff[2];
        subResult[3] += subWeight[3] * subDiff[3];
    }

    for (int i = nstep4; i < n; i++) {
        if (mask1[i] && mask2[i]) {
            double term = data1[i] - data2[i];
            result += weight[i] * term * term;
            totalWeight += weight[i];
        }
    }
        
    result += subResult[0] + subResult[1] + subResult[2] + subResult[3];
    totalWeight += subTweight[0] + subTweight[1] + subTweight[2] + subTweight[3];
        
    //cout << "end fast\n";
    if (!totalWeight) return 0; 
    return result / totalWeight;
}

void test_euclid_ask()
{   
    const int MAXN = 10000000, MINN = 1000000;
    double* data1, * data2;
    int* mask1, * mask2;
    double* dataPro1, * dataPro2;
    int* maskPro1, * maskPro2;
    double *weight, * weightPro;

    //***********
    data1 = new double[MAXN + MINN + 1];
    data2 = new double[MAXN + MINN + 1];
    mask1 = new int[MAXN + MINN + 1];
    mask2 = new int[MAXN + MINN + 1];
    dataPro1 = new double[MAXN + MINN + 1];
    dataPro2 = new double[MAXN + MINN + 1];
    maskPro1 = new int[MAXN + MINN + 1];
    maskPro2 = new int[MAXN + MINN + 1];

    // ******
    weight = new double[MAXN + MINN + 1];
    weightPro = new double[MAXN + MINN + 1];
    MyTimer timer;
    int n;
    double guess1, guess2, tmp, total1 = 0, total2 = 0, prev1 = 0, prev2 = 0;
    
    for (int t = 5000; t < 6000; t++) {
        if (t <= 5000) n = t;
        else n = MINN + rand() % (MAXN - MINN);        
        cout << n << "\n";
        
        int index = 0;
        for (int i = 0; i < n; i++) {
            weight[i] = int64(randomed()) % 100;
            data1[i] = int64(randomed()) % 100;
            data2[i] = int64(randomed()) % 100;
            mask1[i] = rand() % 10;
            mask2[i] = rand() % 10;
        }
        memcpy(weightPro, weight, n * sizeof(double));
        memcpy(dataPro1, data1, n * sizeof(double));
        memcpy(dataPro2, data2, n * sizeof(double));        
        memcpy(maskPro1, mask1, n * sizeof(int));
        memcpy(maskPro2, mask2, n * sizeof(int));
                    
        //****
        int tmp = flush_cache();    // do something to ensure the cache does not contain test data
        cout << "ignore this " << tmp << "\n";
        
        timer.startCounter();
        guess1 = euclid_slow(n, data1, data2, mask1, mask2, weight);
        tmp = timer.getCounterMicro();
        total1 += tmp;
        cout << "time slow = " << tmp << " us\n";

        timer.startCounter();
        guess2 = euclid_fast(n, dataPro1, dataPro2, maskPro1, maskPro2, weightPro);
        tmp = timer.getCounterMicro();
        total2 += tmp;
        cout << "time fast = " << tmp << " us\n";

        bool ok = fabs(guess1 - guess2) <= 0.1;
        if (!ok) {
            cout << "error at N = " << n << "\n";
            exit(-1);
        }        
        cout << "\n";
    }

    cout << "slow speed = " << (total1 / 1000) << " ms\n";
    cout << "fast speed = " << (total2 / 1000) << " ms\n";
}

Basically, the function computes a kind-of Euclidean distance between 2 arrays:

result = sum(weight[i] * (data1[i] - data2[i])^2)

but only in positions where both values are available (mask1[i]==0 means it's ignored, same with mask2). The normal code is in function euclid_slow.

So I tried to improve the code by processing 4 elements at once, hoping that SSE/AVX can speed this up. However, the result stays the same or slower(using g++ -O3 -march=native) or becomes 40% slower (using Visual Studio 2019 compiler, release mode (x64), -O2, AVX2 enabled). I tried both -O2 and -O3, same result.

The compiler made better optimizations than what I wrote. But how can I make it actually faster?

Edit: code to test the programs here

Peter Cordes
  • 328,167
  • 45
  • 605
  • 847
Duke Le
  • 332
  • 3
  • 14
  • 2
    Processing 4 elements at a time doesn't magically vectorize, you would need at least -O3 for that... – Marc Glisse Aug 30 '20 at 16:26
  • I tried both -O2 and -O3, same result :\ – Duke Le Aug 30 '20 at 16:29
  • I haven't tried using intrinsics because I don't know how to implement them yet. But checking the assembly, the compiler doesn't use vector instruction at all! For example: subDiff[i] *= subDiff[i] is compiled into 4 separate instructions. – Duke Le Aug 30 '20 at 16:36
  • 1
    Try clang? It seems to produce vector instructions. – Marc Glisse Aug 30 '20 at 16:39
  • 3
    Manual unrolling typically makes auto-vectorization *harder* for GCC; the compiler's vectorizer is looking for simpler loop structure. Also, you'll might need `-ffast-math` or an OpenMP pragma to auto-vectorize the reductions into `result` and `totalWeight`. – Peter Cordes Aug 30 '20 at 16:39
  • @MarcGlisse oh... I thought gcc is smart enough to recognize all these 256-bit aligned operations as AVX. Is there a solution other than changing the compiler? – Duke Le Aug 30 '20 at 16:39
  • 1
    Another solution: yes, manual vectorization with intrinsics, or fast-math to let the rolled-up version vectorize. (But GCC doesn't always do a good job using multiple vector accumulators to hide FP latency; clang usually does.) – Peter Cordes Aug 30 '20 at 16:41
  • 2
    Things like `subMask[0] = mask1[i] && mask2[i];` are hard to vectorize, since `mask2[i]` must not be accessed if `mask1[i] == 0`. Also, `subWeight[0] = weight[i] * subMask[0];` could result in a NaN if `weight` contains infinities, so the compiler can't replace it by a more efficient bit-and operation. As @Peter said: Either don't manually unroll anything but let the compiler do it (usually requires `-ffast-math`, or a subset from that), or completely vectorize manually with intrinsics. – chtz Aug 30 '20 at 16:56
  • Also, do you really need to waste a whole 32-bit `int` on a boolean mask? With AVX2, packed 8-bit elements are as cheap to unpack to 64-bit double width with `vpmovzxbq`. And packed bits would require even less memory bandwidth. See [is there an inverse instruction to the movemask instruction in intel avx2?](https://stackoverflow.com/a/36491672) – Peter Cordes Aug 30 '20 at 17:17
  • The input comes from another part of the project. So I have to make-do with 32 bit mask – Duke Le Aug 30 '20 at 17:26
  • Also, can you recommend a book/resource to learn assembly from scratch? It's hard for me to understand what intrinsics are actually doing because I haven't used assembly – Duke Le Aug 30 '20 at 17:34
  • Do the input masks have to be checked for `mask[i] !=0` or can you assume that all/certain bits are set? There are many resources linked in the [sse](https://stackoverflow.com/tags/sse/info) and [x86](https://stackoverflow.com/tags/x86/info) tag wikis. – chtz Aug 30 '20 at 17:55
  • Mask[i] can be 0 or not. If it's 0 then that sum is ignored – Duke Le Aug 30 '20 at 18:03
  • I still need to compare 2 arrays of 4 int and convert the result into double. subMask[i] = double(mask1[i] && mask2[i]). Could anyone write this one for me using intrinsic functions in the answer? – Duke Le Aug 31 '20 at 04:15
  • 1
    If you want to make it faster, vectorize yourself, it’s not terribly hard. Use `_mm_loadu_si128` to load 4 integers, `_mm_cmpeq_epi32` to compare them with zeroes, `_mm_and_si128` to combine, `_mm_unpacklo_epi32` / `_mm_unpackhi_epi32` / `_mm256_set_m128i` to expand 32-bit lanes into 64-bit lanes. Then deal with doubles, `_mm256_loadu_pd`, `_mm256_mul_pd`, `_mm256_fmadd_pd`, etc. The fastest way to implement conditions when you have source data made that way is not `_mm256_blend_pd`, it’s bitwise instructions. – Soonts Aug 31 '20 at 13:32
  • @Soonts thanks! I'll try the boolean instruction as well. I have dealt with the doubles (except that I used _mm256_load_pd) but performance doesn't improve. Also, what instruction should I look for in the assembled code (.s file) to know whether the program is actually using 256-bit 4-double addition/multiply? Is it 4 "vaddsd" in a row? – Duke Le Aug 31 '20 at 13:46
  • @DukeLe `vaddsd` is a scalar addition. AVX-vectorized equivalent is a single `vaddpd` with `ymm[0-9]` registers. If you see `vaddpd` with xmm[0-9] arguments, that’s 2-wide vectorized code, SSE2. – Soonts Aug 31 '20 at 18:02
  • @Soonts I see "vaddsd" with xmm[0..4] register. So even with -O3 -mavx -ffast-math, the gcc compiler is not generating AVX code. Thanks, I'll try using manual intrinsic – Duke Le Sep 01 '20 at 04:44

0 Answers0