9

I have a function that copies binary data from one area to another, but only if the bytes are different from a specific value. Here is a code sample:

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    if (src[i] != ignore)
      dest[i] = src[i];
  }
}

The problem is that this is too slow for my current need. Is there a way to obtain the same result in a faster way?

Update: Based on answers I tried two new implementations:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vmask = _mm_cmpeq_epi8(v, vignore);
        vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
        _mm_maskmoveu_si128(v, vmask, (char *)&dest[i]);
    }
    for (; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }

}

And I got the following results:

Naive: 
Duration: 2.04844s
Vectorized: 
Pass: PASS
Duration: 3.18553s
SIMD: 
Pass: PASS
Duration: 0.481888s

I guess my compiler failed to vectorized (last MSVC), but the SIMD solution is good enough, thanks!

Update (bis) I managed to vectorized it using some pragma instructions for my compile (MSVC) and indeed it is actually faster that SIMD, here is the final code:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{

#pragma loop(hint_parallel(0))
#pragma loop(ivdep)

for (int i = 0; i < size; ++i) // Sadly no parallelization if i is unsigned, but more than 2Go of data is very unlikely
{
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
}
}
J. Edmond
  • 293
  • 1
  • 7
  • What happens to the bytes in `dest` that aren't assigned? – dbush Feb 23 '16 at 13:14
  • 1
    Paralelization? A couple of threads each handling a part of the copying? – Some programmer dude Feb 23 '16 at 13:15
  • They are already assigned to some default value. – J. Edmond Feb 23 '16 at 13:16
  • 2
    If you are tied to a specific architecture, e.g. x86, then this would be a good fit for a SIMD implementation (e.g. SSE - see [`maskmovdqu`/`_mm_maskmoveu_si128`](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#expand=3268,3266,3266,3266&text=maskmovdqu)). – Paul R Feb 23 '16 at 13:16
  • Paralelization can be an option but I'm already inside a system with a lot of threads, I would like to avoid making more. I'm actually tied to a x64 architeture. – J. Edmond Feb 23 '16 at 13:21
  • Are you concerned at all that `dest[i]` remains unmodified if `src[i] == ignore`? – David Hoelzer Feb 23 '16 at 13:26
  • @DavidHoelzer No, dest is already set to some value before this copy is done. – J. Edmond Feb 23 '16 at 13:27
  • Ok. Just checking. :) – David Hoelzer Feb 23 '16 at 13:28
  • @PaulR Do you have some good examples/tutorial about SIMD and maskmov? – J. Edmond Feb 23 '16 at 13:49
  • 1
    Please, use the restrict keyword. There's nothing compiler vectorize if you don't assure that pointers doesn't alias. – user3528438 Feb 23 '16 at 13:49
  • @user3528438: My answer begs to differ... – EOF Feb 23 '16 at 13:52
  • What does "too slow" mean? And how big arrays are you moving around? If they are big enough you'll be limited by memory bandwidth and no optimizations will help. – Art Feb 23 '16 at 13:52
  • @J.Edmond: You can see SIMD-ified compiler output generated from C in my answer. – EOF Feb 23 '16 at 13:57
  • Well, since both gcc and clang vectorize my code, and the gcc code beats the hand-unrolled intrinsics-code by more than a factor of two, I'd say get a real compiler. – EOF Feb 23 '16 at 15:04
  • @EOF Haha I guess, but that's my devenv right now. And my target machine is a Windows, but do you think gcc portage on windows are good enough (cygwin and mingw)? – J. Edmond Feb 23 '16 at 15:13
  • @J.Edmond: Sorry, I don't have any experience with windows development. But wasn't there something about clang integration into visual studio? Not sure, but maybe something to look into. – EOF Feb 23 '16 at 15:15
  • @EOF Apparently you can use it in visual studio, I'm gonna try that. Meanwhile I managed to vectorize your code and it actually offers better perf. Thanks ;) – J. Edmond Feb 23 '16 at 15:46

4 Answers4

6

gcc vectorizes the following code:

#include <stddef.h>
void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
  }
}

Note that both the load from- and the assignment to dest[i] are unconditional, so the compiler is not restricted by the prohibition against inventing stores in a multi-threaded program.

Edit for a less ancient compiler and processor, and godbolt links:

x86-64 gcc 11.1 compiles the code to the following with -O3 -mavx512f -mavx512bw, producing an aligned loop processing 64 bytes at a time:

.L5:
        vmovdqu8        (%rdi,%rax), %zmm2
        vpcmpb  $4, %zmm0, %zmm2, %k1
        vmovdqu8        %zmm2, (%rsi,%rax){%k1}
        addq    $64, %rax
        cmpq    %rax, %r8
        jne     .L5

This compiler also does well for gcc -std=gnu11 -O3 -mavx2, processing 32 bytes at a time:

.L5:
        vpcmpeqb        (%rdi,%rax), %ymm1, %ymm0
        vmovdqu (%rdi,%rax), %ymm2
        vpblendvb       %ymm0, (%rsi,%rax), %ymm2, %ymm0
        vmovdqu %ymm0, (%rsi,%rax)
        addq    $32, %rax
        cmpq    %rax, %r8
        jne     .L5

In general, modern compilers do well for any processor architecture with a vector unit.

Old compiler (gcc 4.8.4), old processor (no AVX512), old answer:

For -march=core-avx2, the generated assembly contains this vectorized loop, working on 32 bytes at a time:

.L9:
    vmovdqu (%rdi,%rcx), %ymm1
    addq    $1, %r10
    vmovdqu (%rsi,%rcx), %ymm2
    vpcmpeqb    %ymm0, %ymm1, %ymm3
    vpblendvb   %ymm3, %ymm2, %ymm1, %ymm1
    vmovdqu %ymm1, (%rsi,%rcx)
    addq    $32, %rcx
    cmpq    %r10, %r8
    ja  .L9

For generic x86-64, the generated assembly contains this vectorized loop, working on 16 bytes at a time:

.L9:
    movdqu  (%rdi,%r8), %xmm3
    addq    $1, %r10
    movdqa  %xmm3, %xmm1
    movdqu  (%rsi,%r8), %xmm2
    pcmpeqb %xmm0, %xmm1
    pand    %xmm1, %xmm2
    pandn   %xmm3, %xmm1
    por %xmm2, %xmm1
    movdqu  %xmm1, (%rsi,%r8)
    addq    $16, %r8
    cmpq    %r9, %r10
    jb  .L9

For armv7l-neon, clang-3.7 generates the following loop, working on 16 bytes at a time:

.LBB0_9:                                @ %vector.body
                                        @ =>This Inner Loop Header: Depth=1
        vld1.8  {d18, d19}, [r5]!
        subs.w  lr, lr, #16
        vceq.i8 q10, q9, q8
        vld1.8  {d22, d23}, [r4]
        vbsl    q10, q11, q9
        vst1.8  {d20, d21}, [r4]!
        bne     .LBB0_9

So, the code is not only more readable than assembly or intrinsics, it's also portable to multiple architectures and compilers. New architectures and instruction-set extensions can easily be utilized by recompilation.

EOF
  • 6,273
  • 2
  • 26
  • 50
  • With the Intel compiler 13.1.3.198, I had to mark the pointers `restrict` in order to get vectorization. Otherwise it complains about "vector dependency", rightfully so, I think (consider `dst == src+1`). So my recommendation would be to add the `restrict` modifier to the pointers to increase the chance for vectorization across platforms and compilers. – njuffa Feb 23 '16 at 19:16
  • Some compilers may vectorize this without use of `restrict` if an "assume no aliasing" flag is specified (either explicitly on the command line, or implicitly as part of particular optimization levels). – njuffa Feb 23 '16 at 21:21
  • 1
    @njuffa: Well, gcc simply does a check for overlap at runtime. It takes ~8 simple instructions (`lea`, `or`, `setcc`). You can get rid of them by making the pointers `restrict`, but why bother? – EOF Feb 24 '16 at 01:09
  • That's a very good point, which I had overlooked despite being familiar with the technique. – njuffa Feb 24 '16 at 03:00
5

Here is an example using SSE2 instrinsics to exploit the maskmovdqu instruction. The SIMD version seems to run at around 2x the speed of the original version on a Haswell CPU (code compiled with clang):

    #include <stdio.h>
    #include <string.h>
    #include <emmintrin.h>  // SSE2
    #include <sys/time.h>   // gettimeofday

    void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        for (size_t i = 0; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        const __m128i vignore = _mm_set1_epi8(ignore);

        size_t i;

        for (i = 0; i + 16 <= size; i += 16)
        {
            __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
            __m128i vmask = _mm_cmpeq_epi8(v, vignore);
            vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
            _mm_maskmoveu_si128 (v, vmask, (char *)&dest[i]);
        }
        for ( ; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    #define TIME_IT(init, copy_if, src, dest, size, ignore) \
    do { \
        const int kLoops = 1000; \
        struct timeval t0, t1; \
        double t_ms = 0.0; \
     \
        for (int i = 0; i < kLoops; ++i) \
        { \
            init; \
            gettimeofday(&t0, NULL); \
            copy_if(src, dest, size, ignore); \
            gettimeofday(&t1, NULL); \
            t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
        } \
        printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
    } while (0)

    int main()
    {
        const size_t N = 10000000;

        uint8_t *src = malloc(N);
        uint8_t *dest_ref = malloc(N);
        uint8_t *dest_init = malloc(N);
        uint8_t *dest_test = malloc(N);

        for (size_t i = 0; i < N; ++i)
        {
            src[i] = (uint8_t)rand();
            dest_init[i] = (uint8_t)rand();
        }

        memcpy(dest_ref, dest_init, N);
        copy_if_ref(src, dest_ref, N, 0x42);

        memcpy(dest_test, dest_init, N);
        copy_if_SSE(src, dest_test, N, 0x42);
        printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

        return 0;
    }

Compile and test:

$ gcc -Wall -msse2 -O3 copy_if.c && ./a.out 
copy_if_SSE: PASS
copy_if_ref: 0.416 ns / element
copy_if_SSE: 0.239 ns / element

(Note: earlier version of this answer had a stray factor of 16 in the timing code, so earlier numbers were 16x higher than they should have been.)


UPDATE

Inspired by @EOF's solution and compiler-generated code I tried a different approach with SSE4, and got much better results:

#include <stdio.h>
#include <string.h>
#include <smmintrin.h>  // SSE4
#include <sys/time.h>   // gettimeofday

void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

void copy_if_EOF(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i vsrc = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vdest = _mm_loadu_si128((__m128i *)&dest[i]);
        __m128i vmask = _mm_cmpeq_epi8(vsrc, vignore);
        vdest = _mm_blendv_epi8(vsrc, vdest, vmask);
        _mm_storeu_si128 ((__m128i *)&dest[i], vdest);
    }
    for ( ; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

#define TIME_IT(init, copy_if, src, dest, size, ignore) \
do { \
    const int kLoops = 1000; \
    struct timeval t0, t1; \
    double t_ms = 0.0; \
 \
    for (int i = 0; i < kLoops; ++i) \
    { \
        init; \
        gettimeofday(&t0, NULL); \
        copy_if(src, dest, size, ignore); \
        gettimeofday(&t1, NULL); \
        t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
    } \
    printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
} while (0)

int main()
{
    const size_t N = 10000000;

    uint8_t *src = malloc(N);
    uint8_t *dest_ref = malloc(N);
    uint8_t *dest_init = malloc(N);
    uint8_t *dest_test = malloc(N);

    for (size_t i = 0; i < N; ++i)
    {
        src[i] = (uint8_t)rand();
        dest_init[i] = (uint8_t)rand();
    }

    memcpy(dest_ref, dest_init, N);
    copy_if_ref(src, dest_ref, N, 0x42);

    memcpy(dest_test, dest_init, N);
    copy_if_EOF(src, dest_test, N, 0x42);
    printf("copy_if_EOF: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    memcpy(dest_test, dest_init, N);
    copy_if_SSE(src, dest_test, N, 0x42);
    printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_EOF, src, dest_test, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

    return 0;
}

Compile and test:

$ gcc -Wall -msse4 -O3 copy_if_2.c && ./a.out 
copy_if_EOF: PASS
copy_if_SSE: PASS
copy_if_ref: 0.419 ns / element
copy_if_EOF: 0.114 ns / element
copy_if_SSE: 0.114 ns / element

Conclusion: while _mm_maskmoveu_si128 seems like a good solution for this problem from a functionality perspective, it doesn't seem to be as efficient as using explicit loads, masking and stores. Furthermore, compiler-generated code (see @EOF's answer) seems to be just as fast as explicitly coded SIMD in this instance.

Paul R
  • 208,748
  • 37
  • 389
  • 560
  • If I replace the `SSE` version with my C version on my `core-avx2`, I get `copy_if_vec: PASS copy_if_SSE_unrolled: PASS copy_if_ref: 11.7 ns / element copy_if_vec: 1.7 ns / element copy_if_SSE_unrolled: 4.16 ns / element`, more than 2x faster than your unrolled version. On x86-64 (`SSE2`), I get `copy_if_vec: PASS copy_if_SSE_unrolled: PASS copy_if_ref: 11.7 ns / element copy_if_vec: 1.89 ns / element copy_if_SSE_unrolled: 4.54 ns / element`, also 2x faster. – EOF Feb 23 '16 at 15:01
  • 1
    @EOF: I just realised that my timing code was off by a factor of 16 (I was using some old code and hadn't spotted it earlier). I've updated the original SSE2 implementation above now and also added an SSE4 version inspired by the compiler-generated code for your solution. Timing pretty much matches now. I think the auto-vectorized code will be hard to beat for this particular case. – Paul R Feb 23 '16 at 17:38
  • 2
    Don't forget to mention that `maskmovdqu` has a non-temporal hint, so it evicts the destination from cache when it's done. It's a 10-uop instruction with a throughput of one per 6 cycles on Haswell. `vmaskmovps`/`pd`, and `vpmaskmovd/q` don't have an NT hint, and are faster (esp. if you're going to read the dest again while it's still in cache). But they only mask at 32b or 64b granularity. There is no AVX2 `vmaskmovdqu ymm`, only the 128b version that still has an NT hint. – Peter Cordes Feb 24 '16 at 00:12
  • @PeterCordes: yes, I'd never used `maskmovdqu` before but it seemed like a good fit for this problem - I realise now that it's a costly instruction ! – Paul R Feb 24 '16 at 07:11
  • 1
    SSE: always providing *almost* the instruction you want, but not quite! I've found this is esp. true if you look at asm, rather than just intrinsics. Avoiding extra `mov` instructions is hard, because often the data-movement you want happens in-place in the destination. And bit/byte shifts happen in-place for no apparent reason. Each shift has its own opcode, they just don't use the `/r` field to select a different destination. They could all be like `pshufd`, and have a separate dest. :/ – Peter Cordes Feb 24 '16 at 10:20
  • 2
    @PeterCordes: Well, luckily AVX(2) finally gives sane encodings to the x86 vector instructions. I'd appreciate if intel and amd could stop making new machines that don't support it though. – EOF Feb 24 '16 at 14:28
  • 1
    (Correction to my previous comment; SIMD immediate shifts like [`psllw xmm0, 2`](https://www.felixcloutier.com/x86/psllw:pslld:psllq) *do* use the /r field for extra opcode bits, `66 0F 71 /6 ib` vs. `/4` for `psraw`. Maybe I was looking at `psllw xmm, xmm/m128` with a shift count from another reg, which of course needs both ModRM fields) – Peter Cordes May 22 '21 at 08:21
0

The following would be an improvement, though compilers could devise this themselves.

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  while (size--)
  {
    if (*src != ignore)
      *dest = *src;
    src++; dest++;
  }
}
Paul Ogilvie
  • 25,048
  • 4
  • 23
  • 41
  • Your implementation differs from the original - You're not leaving "holes" in the destination array when you're ignoring bytes in the copy. (And the size_t in the while loop is obviously a typo).- And I'm with you that most modern compilers should work that out themselves. – tofro Feb 23 '16 at 13:28
  • @tofro Sorry but I'm curios. Can you tell me where this differs from the original question? For me they look like they're both doing the same thing. – muXXmit2X Feb 23 '16 at 14:05
  • The original incremented the dest pointer even if the character wasn't copied, but instead ignored - Thus creating "holes" in the destination array. Paul's version does not increment it in this case, so shifting array contents with any skipped copy. And I now see it apparently has been edited, so that no longer is the case. So please ignore my comment. – tofro Feb 23 '16 at 14:38
  • @tofro and others, I saw that and adapted the answer. Tofro, you may have been referring to the old answer. sorry for the confusion. – Paul Ogilvie Feb 23 '16 at 18:17
0

If the freq of ignore isnt to high a memcpy code like below might help.

size_t copy_if(char* src, char* dest, size_t size, char ignore)
{
    size_t i=0, count =0 , res= 0;
    while (count < size)
    {
    while (*src != ignore){
        count++;
        if (count > size)
             break;
        src++;
        i++;
        res++;
    }
    count++;
    if (i> 0){
        memcpy(dest,src-i, i);
        dest += i;
    }
    i = 0;
    src++;
  }
return res;
}
  • With most (all?) implementations of memcpy, you will be passing the array twice now. Once for searching for occurrences of ignore, the second time (hidden) within memcpy(). Most probably not better than before ;) – tofro Feb 23 '16 at 18:46