215

I'm solving a problem and it involves sorting 10 numbers (int32) very quickly. My application needs to sort 10 numbers millions of times as fast as possible. I'm sampling a data set of billions of elements and every time I need to pick 10 numbers out of it (simplified) and sort them (and make conclusions from the sorted 10 element list).

Currently I'm using insertion sort, but I imagine I could implement a very fast custom sorting algorithm for my specific problem of 10 numbers which would beat insertion sort.

How can I approach this problem?

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
bodacydo
  • 75,521
  • 93
  • 229
  • 319
  • 14
    As crude as it sounds, a series of nested `if` statements should work the best. Avoid loops. – John Alexiou Aug 23 '15 at 22:25
  • For small sets, insertion sort and selection sort work surprisingly well. Another option will be radix-sort, since there cannot be more than 10 distinct values. Avoid function calls. – wildplasser Aug 23 '15 at 22:36
  • 8
    Do you expect that the numbers will be given to you with any bias in the set of permutations, or will they be uniformly distributed? Will there be any relationship between the ordering of one list and the next? – Douglas Zare Aug 23 '15 at 23:14
  • 4
    The whole data set (with billions of numbers) is distributed according to Benford's law but when I pick elements randomly out of this set, they no longer are (I think). – bodacydo Aug 23 '15 at 23:15
  • Are the integers signed or unsigned? (or even if the ints are signed, can you guarantee the stored values are going to be positive?) – IQAndreas Aug 24 '15 at 02:19
  • 13
    You may want to read this http://stackoverflow.com/q/2786899/995714 – phuclv Aug 24 '15 at 03:59
  • 11
    If you're selecting randomly from billions of elements then it's quite possible that the latency for pulling that data in may have more of an impact than the time required to sort the selected elements even if the whole data set is in RAM. You could test the impact by benchmarking performance selecting the data sequentially versus randomly. – Steve S. Aug 24 '15 at 16:48
  • @SteveS.: right, sounds like OP should do one partial-shuffle pass first. Sampling-without-replacement will be fine and more efficient. – smci Aug 24 '15 at 22:53
  • @LưuVĩnhPhúc has provided a very good link, at it seems you should use rank order. – jxh Aug 25 '15 at 00:30
  • @IQAndreas It's sometimes signed, sometimes unsigned. Best to assume unsigned. – bodacydo Aug 25 '15 at 00:36
  • @jxh Oh, I didn't notice that new answer there... – m69's been on strike for years Sep 01 '15 at 01:09
  • @m69: It is confusing. The last modification to question by author says rank order is fastest, but the raw timing data shows the modified ordering of swaps for the sorting network is faster. – jxh Sep 01 '15 at 01:13
  • 3
    @bodacydo Could you let us know which option you ended up using, and how and why you chose it, and maybe add benchmark results if you have any? – m69's been on strike for years Sep 08 '15 at 11:39

12 Answers12

217

(Following up on the suggestion of @HelloWorld to look into sorting networks.)

It seems that a 29-comparison/swap network is the fastest way to do a 10-input sort. I used the network discovered by Waksman in 1969 for this example in JavaScript, which should translate directly into C, as it's just a list of if statements, comparisons and swaps.

function sortNet10(data) {    // ten-input sorting network by Waksman, 1969
    var swap;
    if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
    if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
    if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
    if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
    if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
    if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
    if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
    if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
    if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
    if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
    if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
    if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
    if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
    if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
    if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
    if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
    if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
    if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
    if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
    if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
    if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
    if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
    if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
    if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
    if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
    if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
    if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
    if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
    if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
    return(data);
}
document.write(sortNet10([5,7,1,8,4,3,6,9,2,0]));

Here's a graphical representation of the network, divided into independent phases.

10-input sorting network (Waksman, 1969)

To take advantage of parallel processing, the 5-4-3-4-4-4-3-2 grouping can be changed into a 4-4-4-4-4-4-3-2 grouping.

10-input sorting network (Waksman, 1969) re-grouped

  • 71
    suggestion; use a swap macro. like `#define SORTPAIR(data, i1, i2) if (data[i1] > data[i2]) { int swap = data[i1]... }` – Peter Cordes Aug 24 '15 at 01:29
  • 10
    Can it be logically shown that this is the minimum? – corsiKa Aug 24 '15 at 04:21
  • 8
    @corsiKa Yes, sorting networks have been an area of research since the early days of computer science. In many cases, optimal solutions have been known for decades. See https://en.wikipedia.org/wiki/Sorting_network – m69's been on strike for years Aug 24 '15 at 04:49
  • 8
    I made a Jsperf to test and I can confirm that Network Sort is more than 20 times faster that the browsers' native sort. http://jsperf.com/fastest-10-number-sort – Daniel Aug 24 '15 at 09:10
  • 2
    I'm not an expert, but maybe a bitwise XOR swap would be even faster? I tend to use that technique in javascript, I don't know if it's also true in c. Something like: `var1^=var2, var2^=var1, var1^=var2` – Katai Aug 24 '15 at 11:58
  • 9
    @Katai This would destroy any optimization your compiler might produce. Bad idea. Read this for more informations https://en.wikipedia.org/wiki/XOR_swap_algorithm#Reasons_for_avoidance_in_practice – Antzi Aug 24 '15 at 13:02
  • 2
    A simple way to verify that the C compiler produced a properly optimized swap, is to generate the assembly file .S (gcc -S option) and search for the bswap* mnemonics. – dargaud Aug 24 '15 at 14:48
  • 1
    In C/C++, further speed improvement can probably be achieved by aligning the data to be all in the same 64 bytes, i.e. the same cache line (http://stackoverflow.com/questions/14707803/line-size-of-l1-and-l2-caches) – Peter Aug 24 '15 at 19:12
  • 6
    The OP asks for C/C++ and we proof it with a JS benchmark. Ouch. If the compiler translates this into CAS/CMPXCHG this _might_ be the fastest solution. If it ends up being CMP+Jcc - failed branch predicitions will kill the performance. If you need _speed_ consider doing the vectorization yourself. More (in cache/registers) swaps might be faster than more (mispredicted) branches. As always -> profile/benchmark. – Andreas Aug 28 '15 at 13:10
  • 4
    You could try using a branchless version of "swap-if-greater" if your numbers are `int32`: `c = ((b-a) & (b-a)>>31); a += c; b -= c;` instead of `if (a > b) { swap ... }` – e.tadeu Aug 31 '15 at 20:59
  • 1
    With SSE you can do the swaps with MIN/MAX – Ben Jackson Sep 01 '15 at 03:29
  • 1
    @dargaud: `BSWAP` swaps byte order, and `XCHG` isn't necessarily faster than correctly ordered `MOV`'s – peterchen Sep 01 '15 at 09:08
89

When you deal with this fixed size, take a look at sorting networks. These algorithms have a fixed runtime and are independent of their input. For your use case, you don't have such overhead that some sorting algorithms have.

Bitonic sort is an implementation of such a network. This one works best with len(n) <= 32 on a CPU. On bigger inputs you could think of moving to a GPU.

By the way, a good page to compare sorting algorithms is this one here (though it's missing the bitonic sort):

Sorting Algorithms Animations

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
HelloWorld
  • 2,392
  • 3
  • 31
  • 68
  • Lovely solution. I found the following pairs: {{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}, {1, 3}, {2, 4}, {5, 7}, {6, 8}, {1, 9}, {2, 7}, {8, 10}, {1, 5}, {3, 8}, {4, 10}, {6, 9}, {2, 6}, {3, 5}, {4, 9}, {7, 8}, {2, 3}, {4, 7}, {5, 6}, {8, 9}, {3, 5}, {4, 6}, {7, 8}, {4, 5}, {6, 7}} at http://mathpuzzle.com/25Feb.htm. I have not verified their accuracy. – Erick G. Hagstrom Aug 24 '15 at 00:33
  • 3
    @ErickG.Hagstrom There are many solutions; as long as they use 29 comparisons, they're equally efficient. I used Waksman's solution from 1969; he was apparently the first to discover a 29-comparison version. – m69's been on strike for years Aug 24 '15 at 01:09
  • 1
    Yes, @m69. There are over a million. Waksman's solution has a length of 29, and a depth of 9. The solution I linked is an improvement over that in the depth dimension: length = 29, depth = 8. Of course, when implemented in C, depth doesn't matter. – Erick G. Hagstrom Aug 24 '15 at 01:22
  • 4
    @ErickG.Hagstrom Apparently there are 87 solutions with depth 7, the first of which was found by Knuth in 1973, but I haven't been able to find any of them with a quick Google. https://larc.unt.edu/ian/pubs/9-input.pdf (see Conclusion, p.14) – m69's been on strike for years Aug 24 '15 at 01:45
  • Thanks @m69. That's a far more authoritative source than the one I stumbled across. 7 it is, and proved (by exhaustive search) to be optimal. – Erick G. Hagstrom Aug 24 '15 at 01:56
  • 4
    @ErickG.Hagstrom: depth might make no difference "at the C level", but presumably once the compiler and the CPU have finished with it, there is some chance that it will be partly parallelized within the CPU and therefore smaller depth could help. Depending on the CPU, of course: some CPUs are relatively simple and do one thing after another, whereas some CPUs can have multiple operations in flight, in particular you might get very different performance for any loads and stores to the stack that are needed in order to manipulate 10 variables, depending how they're done. – Steve Jessop Aug 24 '15 at 08:12
  • 1
    @ErickG.Hagstrom It wasn't immediately clear from the paper by Ian Parberry, but the depth-7 networks have a length greater than 29. See Knuth, "The Art Of Computer Programming Vol.III", §5.3.4, fig. 49 and 51. – m69's been on strike for years Aug 24 '15 at 14:40
  • @m69 Oh! Then are we faced with choosing length=29 or depth=7 but not both? (I don't have access to Knuth.) – Erick G. Hagstrom Aug 24 '15 at 14:53
  • 1
    @ErickG.Hagstrom Yes, Knuth's is 31. And since Parberry only mentions the 87 depth-7 networks in passing, I assume none was shorter than Knuth's 31. So if the depth-8 one you posted checks out, it's probably the current length-29 champion . – m69's been on strike for years Aug 24 '15 at 15:01
  • Apologies for running away with your idea and a hundred upvotes, by the way. – m69's been on strike for years Sep 22 '15 at 13:42
  • No problem! You did really a good job! You deserve the upvotes :) – HelloWorld Sep 22 '15 at 13:47
  • For a static visualisation, there is [bitonic sort at sortvis](http://sortvis.org/algorithms/bitonicsort.html) – greybeard Sep 22 '15 at 16:46
34

Use a sorting network that has comparisons in groups of 4, so you can do it in SIMD registers. A pair of packed min/max instructions implements a packed comparator function. Sorry I don't have time right now to look for a page I remember seeing about this, but hopefully searching on SIMD or SSE sorting networks will turn something up.

x86 SSE does have packed-32bit-integer min and max instructions for vectors of four 32bit ints. AVX2 (Haswell and later) have the same but for 256b vectors of 8 ints. There are also efficient shuffle instructions.

If you have a lot of independent small sorts, it might be possible to do 4 or 8 sorts in parallel using vectors. Esp. if you're choosing elements randomly (so the data to be sorted won't be contiguous in memory anyway), you can avoid shuffles and simply compare in the order you need. 10 registers to hold all the data from 4 (AVX2: 8) lists of 10 ints still leaves 6 regs for scratch space.

Vector sorting networks are less efficient if you also need to sort associated data. In that case, the most efficient way seems to be to use a packed-compare to get a mask of which elements changed, and use that mask to blend vectors of (references to) associated data.

Peter Cordes
  • 328,167
  • 45
  • 605
  • 847
27

What about an unrolled, branch-less selection sort?

#include <iostream>
#include <algorithm>
#include <random>

//return the index of the minimum element in array a
int min(const int * const a) {
  int m = a[0];
  int indx = 0;
  #define TEST(i) (m > a[i]) && (m = a[i], indx = i ); 
  //see http://stackoverflow.com/a/7074042/2140449
  TEST(1);
  TEST(2);
  TEST(3);
  TEST(4);
  TEST(5);
  TEST(6);
  TEST(7);
  TEST(8);
  TEST(9);
  #undef TEST
  return indx;
}

void sort( int * const a ){
  int work[10];
  int indx;
  #define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647; 
  //get the minimum, copy it to work and set it at max_int in a
  GET(0);
  GET(1);
  GET(2);
  GET(3);
  GET(4);
  GET(5);
  GET(6);
  GET(7);
  GET(8);
  GET(9);
  #undef GET
  #define COPY(i) a[i] = work[i];
  //copy back to a
  COPY(0);
  COPY(1);
  COPY(2);
  COPY(3);
  COPY(4);
  COPY(5);
  COPY(6);
  COPY(7);
  COPY(8);
  COPY(9);
  #undef COPY
}

int main() {
  //generating and printing a random array
  int a[10] = { 1,2,3,4,5,6,7,8,9,10 };
  std::random_device rd;
  std::mt19937 g(rd());
  std::shuffle( a, a+10, g);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  }
  std::cout << std::endl;

  //sorting and printing again
  sort(a);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  } 

  return 0;
}

http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6

The only relevant lines are the first two #define.

It uses two lists and entirely recheck the first one for ten times which would be a badly implemented selection sort, however it avoids branches and variable length loops, which may compensate with modern processors and such a small data set.


Benchmark

I benchmarked against the sorting network, and my code seems to be slower. However I tried to remove the unrolling and the copy. Running this code:

#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>

int min(const int * const a, int i) {
  int m = a[i];
  int indx = i++;
  for ( ; i<10; i++) 
    //see http://stackoverflow.com/a/7074042/2140449
    (m > a[i]) && (m = a[i], indx = i ); 
  return indx;
}

void sort( int * const a ){
  for (int i = 0; i<9; i++)
    std::swap(a[i], a[min(a,i)]); //search only forward
}


void sortNet10(int * const data) {  // ten-input sorting network by Waksman, 1969
    int swap;
    if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
    if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
    if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
    if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
    if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
    if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
    if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
    if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
    if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
    if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
    if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
    if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
    if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
    if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
    if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
    if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
    if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
    if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
    if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
    if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
    if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
    if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
    if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
    if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
    if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
    if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
    if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
    if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
    if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
}


std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) {
  std::mt19937 g(seed);
  int a[10] = {10,11,12,13,14,15,16,17,18,19};
  std::chrono::high_resolution_clock::time_point t1, t2; 
  t1 = std::chrono::high_resolution_clock::now();
  for (long i = 0; i < 1e7; i++) {
    std::shuffle( a, a+10, g);
    func(a);
  }
  t2 = std::chrono::high_resolution_clock::now();
  return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
}

int main() {
  std::random_device rd;
  for (int i = 0; i < 10; i++) {
    const int seed = rd();
    std::cout << "seed = " << seed << std::endl;
    std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl;
    std::cout << "sort:      " << benchmark(sort,      seed).count() << std::endl;
  }
  return 0;
}

I am consistently getting better result for the branch-less selection sort compared to the sorting network.

$ gcc -v
gcc version 5.2.0 (GCC) 
$ g++ -std=c++11 -Ofast sort.cpp && ./a.out
seed = -1727396418
sortNet10: 2.24137
sort:      2.21828
seed = 2003959850
sortNet10: 2.23914
sort:      2.21641
seed = 1994540383
sortNet10: 2.23782
sort:      2.21778
seed = 1258259982
sortNet10: 2.25199
sort:      2.21801
seed = 1821086932
sortNet10: 2.25535
sort:      2.2173
seed = 412262735
sortNet10: 2.24489
sort:      2.21776
seed = 1059795817
sortNet10: 2.29226
sort:      2.21777
seed = -188551272
sortNet10: 2.23803
sort:      2.22996
seed = 1043757247
sortNet10: 2.2503
sort:      2.23604
seed = -268332483
sortNet10: 2.24455
sort:      2.24304
DarioP
  • 5,377
  • 1
  • 33
  • 52
  • 5
    The results are not very impressive, but actually what I would have expected. The sorting network minimizes comparisons, not swaps. When all values are already in the cache comparisons are much cheaper than swaps, so a selection sort (that minimizes the number of swaps) has the upper hand. (and there are not that many more comparisons: network with 29 compasions, up to 29 swaps?; vs. selection sort with 45 comparisons and at most 9 swaps) – example Aug 24 '15 at 20:28
  • 7
    Oh and it does have branches - unless the line `for ( ; i<10; i++) (m > a[i]) && (m = a[i], indx = i ); ` is exceptionally well optimized. (short-circuiting usually is a form of branching) – example Aug 24 '15 at 20:31
  • 1
    @EugeneRyabtsev that too, but it is fed with exactly the same random sequences all the times so it should cancel. I tried to change `std::shuffle` with `for (int n = 0; n<10; n++) a[n]=g();`. The execution time is halved and the network is faster now. – DarioP Aug 25 '15 at 06:51
  • How do this compare against libc++'s `std::sort`? – gnzlbg Aug 31 '15 at 22:18
  • 1
    @gnzlbg I tried `std::sort` as well but it was performing so badly that I didn't even included it in the benchmark. I guess that with tiny data sets there is quite overhead. – DarioP Sep 01 '15 at 06:55
  • @example: I would have expected that if everything is in cache, *swaps* would be cheap, and comparisons expensive because in L1 cache data access is cheap whereas the branch mispredict penalty is unaffected. Are you sure it's the reverse, as you state in your comment? And if it's not a typo - why? – Eamon Nerbonne Sep 01 '15 at 11:13
  • @DarioP thanks, I think that even if it performs badly it is worth including it. It mainly shows that it is worth it to go this far. – gnzlbg Sep 01 '15 at 14:00
  • @EamonNerbonne my thinking was, that comparisons can be done without ever communicating with the ram again, while writes (eg. swaps) have to be written sooner or later. Even if they are only written as the data leaves the cache (something i'm not sure about), the cpu will likely write all of the data if all of it was changed at some point - while the insertion sort might have found single swap that was actually needed. Admittedly I am not so certain about this argument anymore - but I would still say that swaps should be more expansive than comparisons. – example Sep 01 '15 at 23:29
  • Oh and the same holds for the cache to register communication. If it is only read and compared it needs not be updated in the cache. – example Sep 01 '15 at 23:30
  • @example: not every swap will reach main memory. Since the data is dense and small (just 40 bytes, so less than 1 cache line), most of those writes will occur to the cache, which won't be flushed every single write. It shouldn't make much difference whether you swap a few times or a dozens of times - only one main-memory write (or two if it's misaligned) should occur. – Eamon Nerbonne Sep 02 '15 at 14:08
  • Have you tried to benchmark the sorting network with the swapping trick from [this answer](http://stackoverflow.com/a/7181993/1364752)? It is yields results three times faster on my computer, which is consistent with the claims made by a research paper on the subject. – Morwenn Oct 29 '15 at 21:47
  • Any chance you can add a comparison for my algo below? (unrolled insertion sort) – Glenn Teitelbaum May 24 '16 at 23:04
20

The question doesn't say that this is some kind of a web-based application. The one thing that caught my eye was:

I'm sampling a data set of billions of elements and every time I need to pick 10 numbers out of it (simplified) and sort them (and make conclusions from the sorted 10 element list).

As a software and hardware engineer this absolutely screams FPGA to me. I don't know what kind of conclusions you need to draw from the sorted set of numbers or where the data comes from, but I know it would be almost trivial to process somewhere between one hundred million and a billion of these "sort-and-analyze" operations per second. I've done FPGA-assisted DNA sequencing work in the past. It is nearly impossible to beat the massive processing power of FPGAs when the problem is well suited for that type of a solution.

At some level, the only limiting factor becomes how quickly you can shovel data into an FPGA and how quickly you can get it out.

As a point of reference, I designed a high performance real-time image processor that received 32 bit RGB image data at a rate of about 300 million pixels per second. The data streamed through FIR filters, matrix multipliers, lookup tables, spatial edge detection blocks and a number of other operations before coming out the other end. All of this on a relatively small Xilinx Virtex2 FPGA with internal clocking spanning from about 33 MHz to, if I remember correctly, 400 MHz. Oh, yes, it also had a DDR2 controller implementation and ran two banks of DDR2 memory.

An FPGA can output a sort of ten 32 bit number on every clock transition while operating at hundreds of MHz. There would be short delay at the start of the operation as the data fills the processing pipeline/s. After that you should be able to get one result per clock. Or more if the processing can be parallelized through replicating the sort-and-analyze pipeline. The solution, in principle, is almost trivial.

The point is: If the application isn't PC-bound and the data stream and processing is "compatible" with an FPGA solution (either stand-alone or as a co-processor card in the machine) there is no way you are going to be able to beat the attainable level of performance with software written in any language, regardless of the algorithm.

I Just ran quick search and found a paper that might be of use to you. It looks like it dates back to 2012. You can do a lot better in performance today (and even back then). Here it is:

Sorting Networks on FPGAs

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
martin's
  • 3,853
  • 6
  • 32
  • 58
12

I recently wrote a little class that uses the Bose-Nelson algorithm to generate a sorting network on compile time.

It can be used to create a very fast sort for 10 numbers.

/**
 * A Functor class to create a sort for fixed sized arrays/containers with a
 * compile time generated Bose-Nelson sorting network.
 * \tparam NumElements  The number of elements in the array or container to sort.
 * \tparam T            The element type.
 * \tparam Compare      A comparator functor class that returns true if lhs < rhs.
 */
template <unsigned NumElements, class Compare = void> class StaticSort
{
    template <class A, class C> struct Swap
    {
        template <class T> inline void s(T &v0, T &v1)
        {
            T t = Compare()(v0, v1) ? v0 : v1; // Min
            v1 = Compare()(v0, v1) ? v1 : v0; // Max
            v0 = t;
        }

        inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
    };

    template <class A> struct Swap <A, void>
    {
        template <class T> inline void s(T &v0, T &v1)
        {
            // Explicitly code out the Min and Max to nudge the compiler
            // to generate branchless code.
            T t = v0 < v1 ? v0 : v1; // Min
            v1 = v0 < v1 ? v1 : v0; // Max
            v0 = t;
        }

        inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
    };

    template <class A, class C, int I, int J, int X, int Y> struct PB
    {
        inline PB(A &a)
        {
            enum { L = X >> 1, M = (X & 1 ? Y : Y + 1) >> 1, IAddL = I + L, XSubL = X - L };
            PB<A, C, I, J, L, M> p0(a);
            PB<A, C, IAddL, J + M, XSubL, Y - M> p1(a);
            PB<A, C, IAddL, J, XSubL, M> p2(a);
        }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 1>
    {
        inline PB(A &a) { Swap<A, C> s(a, I - 1, J - 1); }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 2>
    {
        inline PB(A &a) { Swap<A, C> s0(a, I - 1, J); Swap<A, C> s1(a, I - 1, J - 1); }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 2, 1>
    {
        inline PB(A &a) { Swap<A, C> s0(a, I - 1, J - 1); Swap<A, C> s1(a, I, J - 1); }
    };

    template <class A, class C, int I, int M, bool Stop = false> struct PS
    {
        inline PS(A &a)
        {
            enum { L = M >> 1, IAddL = I + L, MSubL = M - L};
            PS<A, C, I, L, (L <= 1)> ps0(a);
            PS<A, C, IAddL, MSubL, (MSubL <= 1)> ps1(a);
            PB<A, C, I, IAddL, L, MSubL> pb(a);
        }
    };

    template <class A, class C, int I, int M> struct PS <A, C, I, M, true>
    {
        inline PS(A &a) {}
    };

public:
    /**
     * Sorts the array/container arr.
     * \param  arr  The array/container to be sorted.
     */
    template <class Container> inline void operator() (Container &arr) const
    {
        PS<Container, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
    };

    /**
     * Sorts the array arr.
     * \param  arr  The array to be sorted.
     */
    template <class T> inline void operator() (T *arr) const
    {
        PS<T*, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
    };
};

#include <iostream>
#include <vector>

int main(int argc, const char * argv[])
{
    enum { NumValues = 10 };

    // Arrays
    {
        int rands[NumValues];
        for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
        std::cout << "Before Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
        StaticSort<NumValues> staticSort;
        staticSort(rands);
        std::cout << "After Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
    }

    std::cout << "\n";

    // STL Vector
    {
        std::vector<int> rands(NumValues);
        for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
        std::cout << "Before Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
        StaticSort<NumValues> staticSort;
        staticSort(rands);
        std::cout << "After Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
    }

    return 0;
}

Note that instead of an if (compare) swap statement, we explicitly code out ternary operators for min and max. This is to help nudge the compiler into using branchless code.

##Benchmarks

The following benchmarks are compiled with clang -O3 and ran on my mid-2012 MacBook Air.

###Sorting random data

Comparing it with DarioP's code, here are the number of milliseconds taken to sort 1 million 32-bit int arrays of size 10:

Hardcoded Sort Net 10 : 88.774 ms Templated Bose-Nelson sort 10 : 27.815 ms

Using this templated approach, we can also generate sorting networks upon compile time for other number of elements.

Time (in milliseconds) to sort 1 million arrays of various sizes.

The number of milliseconds for arrays of size 2, 4, 8 are 1.943, 8.655, 20.246 respectively.

C++ Templated Bose-Nelson Static Sort timings

Credits to Glenn Teitelbaum for the unrolled insertion sort.

Here are the average clocks per sort for small arrays of 6 elements. The benchmark code and examples can be found at this question:

Fastest sort of fixed length 6 int array

Direct call to qsort library function       : 326.81
Naive implementation (insertion sort)       : 132.98
Insertion Sort (Daniel Stutzbach)           : 104.04
Insertion Sort Unrolled                     : 99.64
Insertion Sort Unrolled (Glenn Teitelbaum)  : 81.55
Rank Order                                  : 44.01
Rank Order with registers                   : 42.40
Sorting Networks (Daniel Stutzbach)         : 88.06
Sorting Networks (Paul R)                   : 31.64
Sorting Networks 12 with Fast Swap          : 29.68
Sorting Networks 12 reordered Swap          : 28.61
Reordered Sorting Network w/ fast swap      : 24.63
Templated Sorting Network (this class)      : 25.37

It performs as fast as the fastest example in the question for 6 elements.

###Performance for sorting sorted data

Often, the input arrays may be already sorted or mostly sorted. In such cases, insertion sort can be better choice.

Enter image description here

You may want to choose an appropriate sorting algorithm depending on the data.

The code used for the benchmarks can be found here.

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Vectorized
  • 419
  • 6
  • 8
  • Any chance you can add a comparison for my algo below? – Glenn Teitelbaum May 24 '16 at 22:57
  • @GlennTeitelbaum any chance you added this to _your_ benchmarks and disclosed means and results? – greybeard May 25 '16 at 01:06
  • Kudos for adding data on sorting sorted input. – greybeard May 25 '16 at 11:34
  • On some systems `v1 = v0 < v1 ? v1 : v0; // Max` still may branch, in that case it can be replaced with `v1 += v0 - t` because if `t` is `v0` then `v1 + v0 -t == v1 + v0 - v0 == v1` else `t` is `v1` and `v1 + v0 -t == v1 + v0 - v1 == v0` – Glenn Teitelbaum May 25 '16 at 15:18
  • The ternary usually compiles into a `maxss` or `minss` instruction on modern compilers. But in cases where it does not work, other ways of swapping can be used. :) – Vectorized May 25 '16 at 15:26
  • Do you have a reference for "the Bose-Nelson algorithm"? Bose, R. C. and Nelson, R. J. 1962. "A Sorting Problem". JACM, Vol. 9. Pp. 282-296.? The search engines prefer Bose-Einstein condensation and Bose headphones. – Peter Mortensen Jul 11 '20 at 13:41
6

Although a network sort has good odds of being fast on small arrays, sometimes you can't beat insertion sort if properly optimized. For example batch insert with 2 elements:

{
    final int a=in[0]<in[1]?in[0]:in[1];
    final int b=in[0]<in[1]?in[1]:in[0];
    in[0]=a;
    in[1]=b;
}
for(int x=2;x<10;x+=2)
{
    final int a=in[x]<in[x+1]?in[x]:in[x+1];
    final int b=in[x]<in[x+1]?in[x+1]:in[x];
    int y= x-1;

    while(y>=0&&in[y]>b)
    {
        in[y+2]= in[y];
        --y;
    }
    in[y+2]=b;
    while(y>=0&&in[y]>a)
    {
        in[y+1]= in[y];
        --y;
    }
    in[y+1]=a;
}
warren
  • 563
  • 2
  • 13
4

You can fully unroll insertion sort.

To make that easier, recursive templates can be used with no function overhead. Since it already is a template, int can be a template parameter as well. This also makes coding array sizes other than 10 trivial to create.

Note that to sort int x[10] the call is insert_sort<int, 9>::sort(x); since the class uses the index of the last item. This could be wrapped, but that would be more code to read through.

template <class T, int NUM>
class insert_sort;

template <class T>
class insert_sort<T,0>
// Stop template recursion
// Sorting one item is a no operation 
{
public:
    static void place(T *x) {}
    static void sort(T * x) {}
};

template <class T, int NUM>
class insert_sort
// Use template recursion to do insertion sort.
// NUM is the index of the last item, e.g. for x[10] call <9>
{
public:
    static void place(T *x)
    {
        T t1=x[NUM-1];
        T t2=x[NUM];
        if (t1 > t2)
        {
            x[NUM-1]=t2;
            x[NUM]=t1;
            insert_sort<T,NUM-1>::place(x);
        }
    }
    static void sort(T * x)
    {
        insert_sort<T,NUM-1>::sort(x); // Sort everything before
        place(x);                    // Put this item in
    }
};

In my testing this was faster than the sorting network examples.

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Glenn Teitelbaum
  • 10,108
  • 3
  • 36
  • 80
2

For reasons similar to those that I described here, the following sorting functions, sort6_iterator() and sort10_iterator_local(), should perform well:

template<class IterType> 
inline void sort10_iterator(IterType it) 
{
#define SORT2(x,y) {if(data##x>data##y)std::swap(data##x,data##y);}
#define DD1(a)   auto data##a=*(data+a);
#define DD2(a,b) auto data##a=*(data+a), data##b=*(data+b);
#define CB1(a)   *(data+a)=data##a;
#define CB2(a,b) *(data+a)=data##a;*(data+b)=data##b;
  DD2(1,4) SORT2(1,4) DD2(7,8) SORT2(7,8) DD2(2,3) SORT2(2,3) DD2(5,6) SORT2(5,6) DD2(0,9) SORT2(0,9) 
  SORT2(2,5) SORT2(0,7) SORT2(8,9) SORT2(3,6) 
  SORT2(4,9) SORT2(0,1) 
  SORT2(0,2) CB1(0) SORT2(6,9) CB1(9) SORT2(3,5) SORT2(4,7) SORT2(1,8) 
  SORT2(3,4) SORT2(5,8) SORT2(6,7) SORT2(1,2) 
  SORT2(7,8) CB1(8) SORT2(1,3) CB1(1) SORT2(2,5) SORT2(4,6) 
  SORT2(2,3) CB1(2) SORT2(6,7) CB1(7) SORT2(4,5) 
  SORT2(3,4) CB2(3,4) SORT2(5,6) CB2(5,6) 
#undef CB1
#undef CB2
#undef DD1
#undef DD2
#undef SORT2
}

Call it by passing it an std::vector iterator or some other random access iterator:

sort10_iterator(my_std_vector.begin());

The sorting network was taken from here.

Matthew K.
  • 464
  • 5
  • 10
1

An insertion sort requires on average 29,6 comparisons to sort 10 inputs with a best case of 9 and a worst of 45 (given input that is in reverse order).

A {9,6,1} shellsort will require on average 25.5 comparisons to sort 10 inputs. Best case is 14 comparisons, worst is 34 and sorting a reversed input requires 22.

So using shellsort instead of insertion sort reduces the average case by 14%. Although the best case is increased by 56% the worst case is reduced by 24% which is significant in applications where keeping the worst case performance in check is important. The reverse case is reduced by 51%.

Since you seem to be familiar with insertion sort you can implement the algorithm as a sorting network for {9,6} and then tack on the insertion sort ({1}) after that:

i[0] with i[9]    // {9}

i[0] with i[6]    // {6}
i[1] with i[7]    // {6}
i[2] with i[8]    // {6}
i[3] with i[9]    // {6}

i[0 ... 9]        // insertion sort
Olof Forshell
  • 3,169
  • 22
  • 28
1

Why swap when you can move? One x86 cache line has enough extra memory for you to do a merge sort.

I would probably insertion sort indexes 0-1, 2-4, 5-6, 7-9 separately, or even better keep those little groups sorted as I do the inserts, so that each insert requires at most one or two shifts.

Then merge 5,6 and 7-9 -> 10-14, merge 0-1 and 2-4 -> 5-9, and finally merge 5-9 and 10-14 -> 0-9

Matt Timmermans
  • 53,709
  • 3
  • 46
  • 87
1

Following CUDA kernel running on 10 CUDA threads (rank-sort algorithm) sorts 1000 arrays for 1000 times in 42 milliseconds which makes 42 nanoseconds per sorting or ~70 cycles (at 1.7 GHz) per sorting:

inline
__device__ int findOrder(const int mask, const int data0)
{
    const int order1 = data0>__shfl_sync(mask,data0,0);
    const int order2 = data0>__shfl_sync(mask,data0,1);
    const int order3 = data0>__shfl_sync(mask,data0,2);
    const int order4 = data0>__shfl_sync(mask,data0,3);
    const int order5 = data0>__shfl_sync(mask,data0,4);
    const int order6 = data0>__shfl_sync(mask,data0,5);
    const int order7 = data0>__shfl_sync(mask,data0,6);
    const int order8 = data0>__shfl_sync(mask,data0,7);
    const int order9 = data0>__shfl_sync(mask,data0,8);
    const int order10 = data0>__shfl_sync(mask,data0,9);
    return order1 + order2 + order3 + order4 + order5 + order6 + order7 + order8 + order9 + order10;
}

// launch this with 10 CUDA threads in 1 block in 1 grid
// sorts 10 elements using only SIMT registers
__global__ void rankSort(int * __restrict__ buffer)
{    
    const int id  = threadIdx.x;

    // enable first 10 lanes of warp for shuffling 
    const int mask = __activemask();

    __shared__ int data[10000];

    for(int i=0;i<1000;i++)
    {
        data[id+i*10] = buffer[id+i*10];
    }
    __syncwarp();
    // benchmark 1000 times
    for(int k=0;k<1000;k++)
    {

        // sort 1000 arrays
        for(int j=0;j<1000;j+=5)
        {
            // sort 5 arrays at once to hide latency
            const int data1 = data[id+j*10];
            const int data2 = data[id+(j+1)*10];
            const int data3 = data[id+(j+2)*10];
            const int data4 = data[id+(j+3)*10];
            const int data5 = data[id+(j+4)*10];

            const int order1 = findOrder(mask,data1);        
            const int order2 = findOrder(mask,data2);
            const int order3 = findOrder(mask,data3);
            const int order4 = findOrder(mask,data4);
            const int order5 = findOrder(mask,data5);

            data[order1+j*10]=data1;         
            data[order2+(j+1)*10]=data2;           
            data[order3+(j+2)*10]=data3;
            data[order4+(j+3)*10]=data4;
            data[order5+(j+4)*10]=data5;
        }

    }
    __syncwarp();
    for(int i=0;i<1000;i++)
    {
        buffer[id+i*10] = data[id+i*10];
    }
}   

Since all 10 threads are processed on same SIMT unit, it is similar to an AVX512 optimized version running on vector registers but with exception of many more register space to hide more latency. Also the SIMT unit is 32-wide so it can run linear time complexity until 32 elements.

This algorithm assumes that elements are unique. For duplicated data, an extra reduction step is required to unpack duplicated order values to 10 elements. First it computes rank of each element then directly copies them to their rank as index in the array. Order values require brute-force (O(N x N)) number of comparisons and to decrease latency, warp-shuffles are used to gather data from different warp-lanes (of a vector register).

huseyin tugrul buyukisik
  • 11,469
  • 4
  • 45
  • 97