15

I've got a case where I need to compress a lot of often small values. Thus I compress them with a variable-length byte encoding (ULEB128, to be specific):

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size = 0;
  while (n  > 127)
  {
    ++size;
    *data++ = (n & 127)|128;
    n >>= 7;
  }
  *data++ = n;
  return ++size;
}

Is there a more efficient way to do this (maybe using SSE)?

Edit: After this compression, the result is stored into data, taking size bytes. Then, the compression function is called on the next unsigned int.

Peter Cordes
  • 328,167
  • 45
  • 605
  • 847
Alexandre Hamez
  • 7,725
  • 2
  • 28
  • 39
  • 3
    Technically this is called the ULEB128 encoding. – kennytm May 02 '11 at 14:55
  • 2
    I think you should add something about the type of the `n` variable, since its exact range limitations probably affect what is and isn't possible using SSE and similar extensions. – unwind May 02 '11 at 15:43
  • You might want to look other encoding methods. In the development of protobuf Google performed various benchmarks and they discovered that the leading bit indicator was slowish because you had a test (basically unpredictable) for each byte of data. It was found that separating the structure in two: first indicating the length of each "pack" and then shoving the packs, was faster. – Matthieu M. May 02 '11 at 15:57
  • You could try transforming the loop into a switch on bitscan(n)/7. Also, instead of doing 1-byte stores, use a temporary variable and then memcpy() it in place (optimizing compilers will inline and specialize small memcpy()s) – CAFxX May 02 '11 at 16:04
  • @Matthieu M.: I can't find a link for these benchmarks. Have you got one to share? – Alexandre Hamez May 02 '11 at 16:12
  • 2
    @Alexandre: I was reading on Jeffrey Dean, one of Google's developers (http://research.google.com/people/jeff/index.html). I can't find the specifications of the binary format for protobuf any longer. I recall they used 4 different compression technics, depending on the section of the message. (I think there was ULEB128 for some, though they call it varint). – Matthieu M. May 02 '11 at 16:51
  • 1
    As a side note, it's not really true that SSE can't be used to speed this up, but you'd probably need to be able to process 4 (or more) integers in parallel to gain something in terms of speed. – CAFxX May 04 '11 at 12:09
  • Related for a wider version of this (64-bit integers), [QWORD shuffle sequential 7-bits to byte-alignment with SIMD SSE...AVX](https://stackoverflow.com/q/10850054) shows an interesting trick of using SIMD multiplies on odd/even 7-bit fields of the input, then recombining. Or if you have AVX-512VBMI, `vpmultishiftqb` is great for encoding, although you still need to find the length for each element and compact. Perhaps with a compare and `vpcompressb`. – Peter Cordes May 11 '23 at 05:29

6 Answers6

8

The first thing you want to do is test any possible solution against your current code.

I think you may want to try and get rid of data dependencies, to allow the processor to do more work at the same time.

What are data dependencies? As data flows through your function, the current value of n depends on the previous value of n, which depends on the value before that... which is a long chain of data dependencies. In the code below, n is never modified so the processor can "skip ahead" and do a couple different things at the same time without having to wait for the new n to be computed.

// NOTE: This code is actually incorrect, as caf noted.
// The byte order is reversed.
size_t
compress_unsigned_int(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

I tested the performance by executing it in a tight loop from 0..UINT_MAX. On my system, the execution times are:

(Lower is better)
Original: 100%
caf's unrolled version: 79%
My version: 57%

Some minor tweaking may produce better results, but I doubt you'll get much more improvement unless you go to assembly. If your integers tend to be in specific ranges, then you can use profiling to get the compiler to add the right branch predictions to each branch. This might get you a few extra percentage points of speed. (EDIT: I got 8% from reordering the branches, but it's a perverse optimization because it relies on the fact that each number 0...UINT_MAX appears with equal frequency. I don't recommend this.)

SSE won't help. SSE is designed to operate on multiple pieces of data with the same width at the same time, it is notoriously difficult to get SIMD to accelerate anything with a variable length encoding. (It's not necessarily impossible, but it might be impossible, and you'd have to be pretty smart to figure it out.)

Dietrich Epp
  • 205,541
  • 37
  • 345
  • 415
  • Yes, this essentially just takes the unrolling process even further. You could reorder the branches depending on the frequency distribution of the inputs - I tested `< 0x80` first because the OP says that the values are _"often small"_. – caf May 03 '11 at 05:20
  • I'm pretty sure You could do the same thing (compile-time fixed size loop unrolling) using some template voodoo. – valdo May 03 '11 at 05:32
  • Incidentally this isn't correct - it's supposed to be outputting the least significant bits first. – caf May 03 '11 at 06:45
  • Ok thanks, I've got the idea even if it's not correct :-). Though, after the correction, the version of caf is faster. (with clang on a Core 2 Duo). – Alexandre Hamez May 03 '11 at 09:03
  • 1
    @Alexandre: Interesting. The system I used for testing has an AMD Phenom II, and my version was significantly faster both with GCC and with clang... and the GCC versions were all faster than the clang versions. (GCC produced code with 86% the execution time of clang.) – Dietrich Epp May 04 '11 at 06:25
  • 1
    @valdo: I am not just doing loop unrolling, I'm doing data dependency elimination. If you're so sure, show us the code. – Dietrich Epp May 04 '11 at 06:28
5

You might find fast implementation in google protocol buffers:

http://code.google.com/p/protobuf/

Look at CodedOutputStream::WriteVarintXXX methods.

First method might be rewritten as:

char *start = data;
while (n>=0x80)
{
    *data++=(n|0x80);
    n>>=7;
}
*data++=n;
return data-start;

According to my test google buffers implementation is the best, then come other implementations. However my test is rather artificial, it is better to test each approach in your application and choose the best. Presented optimizations work better on specific number values.

Here is code of my test application. (Note I've removed code from compress_unsigned_int_google_buf. You might find implementation in the following file from google buffer protocol: coded_stream.cc method CodedOutputStream::WriteVarint32FallbackToArrayInline)

size_t compress_unsigned_int(unsigned int n, char* data)
{
    size_t size = 0;
    while (n  > 127)
    {
        ++size;
        *data++ = (n & 127)|128;
        n >>= 7;
    }
    *data++ = n;
    return ++size;
}

size_t compress_unsigned_int_improved(unsigned int n, char* data)
{
    size_t size;

    if (n < 0x00000080U) {
        size = 1;
        goto b1;
    }
    if (n < 0x00004000U) {
        size = 2;
        goto b2;
    }
    if (n < 0x00200000U) {
        size = 3;
        goto b3;
    }
    if (n < 0x10000000U) {
        size = 4;
        goto b4;
    }
    size = 5;

    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b4:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b3:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b2:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b1:
    *data = n;
    return size;
}

size_t compress_unsigned_int_more_improved(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

size_t compress_unsigned_int_simple(unsigned int n, char *data)
{
    char *start = data;
    while (n>=0x80)
    {
        *data++=(n|0x80);
        n>>=7;
    }
    *data++=n;
    return data-start;
}

inline size_t compress_unsigned_int_google_buf(unsigned int value, unsigned char* target) {

          // This implementation might be found in google protocol buffers

}



#include <iostream>
#include <Windows.h>
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
    char data[20];
    unsigned char udata[20];
    size_t size = 0;
    __int64 timer;

    cout << "Plain copy: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        memcpy(data,&i,sizeof(i));
        size += sizeof(i);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Original: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size << endl;

    cout << "Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "More Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_more_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Simple: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_simple(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Google Buffers: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_google_buf(i,udata);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    return 0;
}

On my machine with Visual C++ compiler I've got following results:

Plain copy: 358 ms

Original: 2497 ms

Improved: 2215 ms

More Improved: 2231 ms

Simple: 2059 ms

Google Buffers: 968 ms

Petro
  • 51
  • 1
  • 1
  • Dead link. `inline void WriteVarint(uint64_t val, std::string* s)` is defined in https://github.com/protocolbuffers/protobuf/blob/56d1b0f3f5e0170ea336a754af0bb3ae7b6d4bfa/src/google/protobuf/parse_context.cc#L338 but that's just a simplistic loop. `WriteVarint32FallbackToArrayInline` doesn't appear in the codebase. They have an AArch64-optimized *decode* function `VarintParseSlowArm64` https://github.com/protocolbuffers/protobuf/blob/56d1b0f3f5e0170ea336a754af0bb3ae7b6d4bfa/src/google/protobuf/parse_context.h#L825 ("slow" meaning fallback when it's not a single byte). – Peter Cordes May 12 '23 at 04:18
  • Note that your benchmark encodes mostly large integers, since it does all number from 0 up to some limit. Varint is optimized for small integers. Also, since you test in order, branches predict near-perfectly as numbers grow in size. e.g. the last half of the numbers all have the leading bit in the same position. – Peter Cordes May 12 '23 at 04:29
  • Your `compress_unsigned_int_more_improved` gets the "endianness" backwards. In a normal varint, the low 7 bits are in the *first* byte (lowest address), not the last. – Peter Cordes May 12 '23 at 04:30
3

If your unsigned int values are limited to a specific range - say, 32 bits - you can unroll the loop:

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size;

  if (n < 0x00000080U) {
    size = 1;
    goto b1;
  }
  if (n < 0x00004000U) {
    size = 2;
    goto b2;
  }
  if (n < 0x00200000U) {
    size = 3;
    goto b3;
  }
  if (n < 0x10000000U) {
    size = 4;
    goto b4;
  }
  size = 5;

  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b4:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b3:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b2:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b1:
  *data = n;
  return size;
}
caf
  • 233,326
  • 40
  • 323
  • 462
  • 1
    A good optimizing compiler (gcc, msvc, intel's) should do this for you too. – Blindy May 03 '11 at 02:37
  • @Blindy: In my testing, gcc 4.4.5 did not unroll this particular example. – caf May 03 '11 at 03:00
  • Two notes: 1) you should probably try coalescing those single-byte stores in multi-byte ones (e.g. 5->1+4, 4->4, 3->1+2, 2->2, 1->1), this will also force you to get rid of the data dependencies on n after the jump - 2) replace the ifs and jumps with a fall-through switch on bitscan(n)/7 – CAFxX May 03 '11 at 20:18
  • @CAFxX: Do you have an explanation or measurements to support that? Compilers implement case statements with the same branch operations as if statements and reorder code blocks as they see fit, so you are unlikely to see a consistent difference between "if" and "switch". Also, misaligned multibyte stores are very slow on various architectures, and will crash your program on SPARC or (some?) ARM targets. – Dietrich Epp May 04 '11 at 06:32
  • @Dietrich: Compilers do not necessarily implement `switch` statements as individual `if` statements. You will likely see that for small switches, but for switches with a larger number of branches, I've seen gcc do a binary tree search followed by a small number of explicit tests. – Christopher Creutzig May 04 '11 at 07:22
  • @Dietrich small switches with continuous ranges (like in this case) should be transformed in jump tables. I also assumed we were talking about x86 (not ARM or SPARC) because the OP asked about SSE. Your best bet is invoking memcpy and hope (if the compiler is smart enough) that it gets inlined and specialized (e.g. LLVM should do that IIRC). – CAFxX May 04 '11 at 10:38
  • @CAFxX: What is exactly this 'bitscan(n)/7'? Is it related to finding the most significant bit? Because, if it's the case, I tried a such thing, with a fall-through switch, but it was slower. – Alexandre Hamez May 04 '11 at 13:08
  • @Alexandre yes, it's the function (an intrinsic in most compilers) that returns the index of the MSSB. – CAFxX May 04 '11 at 18:50
  • @Christopher: "Same branch instructions" -- that includes binary searching. @CAFxX: Yes, I forgot about branch tables. GCC won't use them with fewer than 6 cases, but clang will -- and clang tested as slower for me. You say your best bet is `memcpy`... do you care to back that up with data? – Dietrich Epp May 05 '11 at 05:39
  • @CAFxX, @Alexandre: `bitscan` is equivalent to GCC's `__builtin_ffs` intrinsic, I think. – Dietrich Epp May 05 '11 at 05:43
  • @Dietrich, I don't have any hard data, but LLVM's TargetLowering.h says, for its memcpy intrnsic: "For example, storing 7 bytes on a 32-bit machine with 32-bit alignment would result in one 4-byte store, a one 2-byte store and one 1-byte store. This only applies to copying a constant array of constant size." It's true that in this case we can't really guarantee known alignment and constant size (since the cases are just a few we could even manually specialize them, but I guess it would be overkill) but this should hint at why I said that it was its best bet. – CAFxX May 05 '11 at 06:47
  • @Dietrich: Same instructions as non-obvious nested if statements, the best construction of which depends on the target machine (and the input data distribution), right. – Christopher Creutzig May 05 '11 at 11:28
2

After more browsing, I found another commonly used implementation in Sqlite3 (code version 3070900):

inline int sqlite3PutVarint(unsigned char *p, unsigned __int64 v){
  int i, j, n;
  unsigned char buf[10];
  if( v & (((unsigned __int64)0xff000000)<<32) ){
    p[8] = (unsigned char)v;
    v >>= 8;
    for(i=7; i>=0; i--){
      p[i] = (unsigned char)((v & 0x7f) | 0x80);
      v >>= 7;
    }
    return 9;
  }    
  n = 0;
  do{
    buf[n++] = (unsigned char)((v & 0x7f) | 0x80);
    v >>= 7;
  }while( v!=0 );
  buf[0] &= 0x7f;
  for(i=0, j=n-1; j>=0; j--, i++){
    p[i] = buf[j];
  }
  return n;
}

There is also slightly optimized version for 32 bit int:

int sqlite3PutVarint32(unsigned char *p, unsigned int v){

  if( (v & ~0x7f)==0 ){
    p[0] = v;
    return 1;
  }

  if( (v & ~0x3fff)==0 ){
    p[0] = (unsigned char)((v>>7) | 0x80);
    p[1] = (unsigned char)(v & 0x7f);
    return 2;
  }
  return sqlite3PutVarint(p, v);
}

It is dissappointing that Sqlite implementation performs the worst of all in my test. So if you are going to use Sqlite consider replacing default implementation with an optimized one.

Meanwhile I am thinking about further possible optimizations.

Peter
  • 346
  • 5
  • 7
1

Here's my optimization in x86 assembly language (32 bit). You can compile with NASM and link. I don't know if it's fast or slow, I just had fun with coding :)

global compress_unsigned_int

;   bit fields:
;   31                              0
;    eeeedddddddcccccccbbbbbbbaaaaaaa


compress_unsigned_int:
    mov     eax, [esp+4]    ; n
    mov     ecx, [esp+8]    ; data

    cmp     eax, 00001111111111111111111111111111b
    jbe     out4b

    shld    edx, eax, 11
    shl     eax, 10
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    or      edx, 10000000100000001000000010000000b

    mov     [ecx], edx
    mov     eax, [esp+4]
    shr     eax, 28
    mov     [ecx+4], al

    mov     eax, 5
    jmp     exit

out4b:
    cmp     eax, 00000000000111111111111111111111b
    jbe     out3b

    shld    edx, eax, 11
    shl     eax, 10
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    or      edx, 00000000100000001000000010000000b

    mov     [ecx], edx

    mov     eax, 4
    jmp     exit

out3b:
    cmp     eax, 00000000000000000011111111111111b
    jbe     out2b

    shld    edx, eax, 25
    shl     eax, 24
    shld    edx, eax, 8

    mov     eax, edx

    or      edx, 00000000000000001000000010000000b

    mov     [ecx], dx
    shr     eax, 15
    mov     [ecx+2], al

    mov     eax, 3
    jmp     exit

out2b:
    cmp     eax, 00000000000000000000000001111111b
    jbe     out1b

    shld    edx, eax, 25
    shl     eax, 24
    shld    edx, eax, 8
    or      edx, 00000000000000000000000010000000b

    mov     [ecx], dx

    mov     eax, 2
    jmp     exit

out1b:
    mov     [ecx], al

    mov     eax, 1

exit:
    ret
0

You might save a few operations by replacing
size_t size=0;...++size;...;return size++; with
char* base=data;...;return data-base;

AShelly
  • 34,686
  • 15
  • 91
  • 152