2

I need to sort array of integers between startIndex to startIndex + 12. This operation is critical for my performance.

What algorithm do you suggest me to use?

Right now I am using Bubble sort and it's not performing so well...

Update: Sorry for missing details. I am working with random arrays. I do this action very often, I am working in Java.

Update 2: I am not sure that insertion sort is good idea as I am using native arrays and not ArrayList. So I need to implement the insertion by my self or combine it with bubble search some how.

Ilya Gazman
  • 31,250
  • 24
  • 137
  • 216

5 Answers5

8

You can try this sorting network:

http://jgamble.ripco.net/cgi-bin/nw.cgi?inputs=12&algorithm=best&output=svg

It is an optimal configuration for 12 items. That means, minimizing the necessary comparators and depth (consecutive steps if you can work in parallel) of the operation.

To take the advantage of parallelization, use SIMD (SEE) instructions. I don't know how to do this with Java.

ypnos
  • 50,202
  • 14
  • 95
  • 141
  • 2
    @Ilya_Gazman Wherever you see a comparator with inputs i and j, you write a line of code like `if (a[i] > a[j]) { int t = a[i]; a[i] = a[j]; a[j] = t; }` – David Eisenstat Mar 27 '14 at 13:19
  • Basically you see which elements need to be compared (and depending on the values, swapped). You also see it works in several steps. So finish the first row, then go on with the second. Now if you can do several of them at once (all comparisons/swaps in one row can theoretically be done at the same time) you can have huge speed gains. For this you need to use vector computation, that is specific instructions on the CPU (SSE or AVX for modern CPUs). – ypnos Mar 27 '14 at 13:19
  • 1
    @DavidEisenstat so basically those are pairs of array indexes that I need to check and swap(if needed), and each row can run in parallel? – Ilya Gazman Mar 27 '14 at 13:23
2

For 12 items, probably insertion sort. It typically has the best empirical performance of the O(n^2) sort algorithms. The O(n log n) algorithms are probably overkill for such a small set, the complexity of the algorithms usually mean that they don't pay off until the set you are sorting is larger.

Of course, if you really want to squeeze out the last drop of performance, you probably need to write some assembly and do it in registers or something.

If you know that they are bounded, radix sort could also be a good approach.

Jeremy West
  • 11,495
  • 1
  • 18
  • 25
1

I would also try a hard-coded merge sort. First sort each consecutive group of 3 with hard code (no index variables). That's 3 comparisons times 4 = 12.

Then merge the first two groups of 3. That's 3 comparisons times 2 = 6.

Then merge the two groups of 6. That's 6 comparisons.

Total 24 comparisons (and data movement).

This might be faster than the 72 comparisons and possible swaps.

I'd step through it in assembler, to see if any instructions are not pulling their weight.

Mike Dunlavey
  • 40,059
  • 14
  • 91
  • 135
  • Actually there is a native implantation for that: Arrays.sort(). It appears to be slaw a bit. I just finish implementing @ypnos suggestion and it's faster. – Ilya Gazman Mar 27 '14 at 14:43
  • @Ilya: The sorting network could be the fastest, but I bet the hard-coding method would outperform Arrays.sort() by a good bit, just by avoiding method calls and such. – Mike Dunlavey Mar 27 '14 at 14:47
  • Hehe actually this is exactly what I am trying to do. I share my source in a moment. – Ilya Gazman Mar 27 '14 at 14:48
1

I implemented small test, once using Arrays.sort and once using an own sort implementation based on the sorting network referred to by @ypnos in https://stackoverflow.com/a/22688819 .

However, do NOT take this comparison toooo serious. It is not a very sophisticated microbenchmark, and there are certainly many influencing factors not considered yet. One that came to my mind is: Are the 12-element segments sorted linearly? That is, are you first sorting elements [0,12), then [12,24), and so on, or are the segments scattered in the array? This will probably have a performance impact due to caching. This impact could be equal for all approaches, but should be considered nevertheless.

In any case, it seems to be possible to squeeze out a tiny bit of performance with such a sorting network (or an "unrolled" sorting method in general).

But just for comparison, I added a parallel approach, where the tasks to sort sets of 12-elements-segments are distributed among all available cores, and it seems that it is possible to achieve a significant speedup this way. So you should probably consider some sort of parallelization for this task in general.

(Start with -Xmx2000m to have enough memory for the large arrays)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SmallSortTest
{
    public static void main(String[] args)
    {
        Random random = new Random(0);
        for (int size=8000000; size<=8000000*10; size+=8000000)
        {
            int array[] = createRandomArray(size, 0, 1000, random);

            int array0[] = array.clone();
            testArrays(array0);

            int array1[] = array.clone();
            testOwn(array1);

            int array2[] = array.clone();
            testParallel(array2);

            if (!Arrays.equals(array0, array1)) System.out.println("Error");
            if (!Arrays.equals(array0, array2)) System.out.println("Error");
        }
    }

    private static void testArrays(int array[])
    {
        long before = System.nanoTime();
        for (int i=0; i<array.length/12; i++)
        {
            Arrays.sort(array, i*12, i*12+12);
        }
        long after = System.nanoTime();

        System.out.println(
            "Arrays   size "+array.length+
            " duration "+(after-before)*1e-6+
            ", some result "+array[array.length/2]);
    }

    private static void testOwn(int array[])
    {
        long before = System.nanoTime();
        for (int i=0; i<array.length/12; i++)
        {
            sort(array, i*12);
        }
        long after = System.nanoTime();
        System.out.println(
            "Own      size "+array.length+
            " duration "+(after-before)*1e-6+
            ", some result "+array[array.length/2]);
    }

    private static void testParallel(final int array[])
    {
        int n = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newFixedThreadPool(n);

        int batchSize = (int)Math.ceil((double)array.length / 12  / n);
        final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
        for (int i=0; i<n; i++)
        {
            final int minIndex = (i+0)*batchSize;
            final int maxIndex = Math.min(array.length, (i+1)*batchSize);
            Runnable runnable = new Runnable()
            {
                @Override
                public void run()
                {
                    for (int i=minIndex; i<maxIndex; i++)
                    {
                        Arrays.sort(array, i*12, i*12+12);
                    }
                }
            };
            tasks.add(Executors.callable(runnable));
        }

        long before = System.nanoTime();
        try
        {
            executor.invokeAll(tasks);
        }
        catch (InterruptedException e1)
        {
            Thread.currentThread().interrupt();
        }        
        long after = System.nanoTime();

        executor.shutdown();
        try
        {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
        }

        System.out.println(
            "Parallel size "+array.length+
            " duration "+(after-before)*1e-6+
            ", some result "+array[array.length/2]);
    }

    private static void sort(int array[], int startIndex)
    {
        int i0 = startIndex+11;
        int i1 = startIndex+10;
        int i2 = startIndex+9;
        int i3 = startIndex+8;
        int i4 = startIndex+7;
        int i5 = startIndex+6;
        int i6 = startIndex+5;
        int i7 = startIndex+4;
        int i8 = startIndex+3;
        int i9 = startIndex+2;
        int i10 = startIndex+1;
        int i11 = startIndex+0;

        if (array[i0] < array[i1]) swap(array, i0, i1);
        if (array[i2] < array[i3]) swap(array, i2, i3);
        if (array[i4] < array[i5]) swap(array, i4, i5);
        if (array[i6] < array[i7]) swap(array, i6, i7);
        if (array[i8] < array[i9]) swap(array, i8, i9);
        if (array[i10] < array[i11]) swap(array, i10, i11);

        if (array[i1] < array[i3]) swap(array, i1, i3);
        if (array[i5] < array[i7]) swap(array, i5, i7);
        if (array[i9] < array[i11]) swap(array, i9, i11);
        if (array[i0] < array[i2]) swap(array, i0, i2);
        if (array[i4] < array[i6]) swap(array, i4, i6);
        if (array[i8] < array[i10]) swap(array, i8, i10);

        if (array[i1] < array[i2]) swap(array, i1, i2);
        if (array[i5] < array[i6]) swap(array, i5, i6);
        if (array[i9] < array[i10]) swap(array, i9, i10);
        if (array[i0] < array[i4]) swap(array, i0, i4);
        if (array[i7] < array[i11]) swap(array, i7, i11);

        if (array[i1] < array[i5]) swap(array, i1, i5);
        if (array[i6] < array[i10]) swap(array, i6, i10);
        if (array[i3] < array[i7]) swap(array, i3, i7);
        if (array[i4] < array[i8]) swap(array, i4, i8);

        if (array[i5] < array[i9]) swap(array, i5, i9);
        if (array[i2] < array[i6]) swap(array, i2, i6);
        if (array[i0] < array[i4]) swap(array, i0, i4);
        if (array[i7] < array[i11]) swap(array, i7, i11);
        if (array[i3] < array[i8]) swap(array, i3, i8);

        if (array[i1] < array[i5]) swap(array, i1, i5);
        if (array[i6] < array[i10]) swap(array, i6, i10);
        if (array[i2] < array[i3]) swap(array, i2, i3);
        if (array[i8] < array[i9]) swap(array, i8, i9);

        if (array[i1] < array[i4]) swap(array, i1, i4);
        if (array[i7] < array[i10]) swap(array, i7, i10);
        if (array[i3] < array[i5]) swap(array, i3, i5);
        if (array[i6] < array[i8]) swap(array, i6, i8);

        if (array[i2] < array[i4]) swap(array, i2, i4);
        if (array[i7] < array[i9]) swap(array, i7, i9);
        if (array[i5] < array[i6]) swap(array, i5, i6);

        if (array[i3] < array[i4]) swap(array, i3, i4);
        if (array[i7] < array[i8]) swap(array, i7, i8);
    }

    private static void swap(int array[], int i0, int i1)
    {
        int temp = array[i0];
        array[i0] = array[i1];
        array[i1] = temp;
    }


    private static int[] createRandomArray(int size, int min, int max, Random random)
    {
        int array[] = new int[size];
        for (int i=0; i<size; i++)
        {
            array[i] = min+random.nextInt(max-min);
        }
        return array;
    }
}
Community
  • 1
  • 1
Marco13
  • 53,703
  • 9
  • 80
  • 159
  • To revert the sorting order, you mirror all indices. `i0 = startIndex+11; i1 = startIndex10; ... i11 = startIndex + 0;`. The network itself, including comparisons, stays the same. I'm not 100% sure, but quite sure! Report back :-) – ypnos Mar 28 '14 at 11:11
  • @ypons I think I tried this, and changing the comparisons from `<` to `>`, as well as a combination of both, and *think* that none of these had the desired effect, but maybe there was something else wrong at this time. I would have to test it again. – Marco13 Mar 28 '14 at 11:16
  • Yeah don't change the comparisons! Think about it like this: The network sorts into buckets 1, 2, 3.. 12. Then you reverse the array, making bucket 1 the new bucket 12, bucket 2 the new bucket 11, etc. Instead let the network sort in buckets 12, 11, 10 .. 1. How to do this? Let the network think that 1 == 12, 2 == 11, 3 == 10, 12 == 1. This is what you do when you change the indices before running the network. – ypnos Mar 28 '14 at 12:38
  • @ypons Indeed, I tried it again, and now it worked (edited the answer accordingly). I'm absolutely sure that I already tried this. I even said in the answer that ~*"...it's not done with changing some indices..."*. But it seems that there was something else wrong when I tried it the first time. – Marco13 Mar 28 '14 at 13:52
1

QuickSort and MergeSort are not effective for such a small array size. I would use insertionSort which I measured as faster than bubbleSort.

private static void insertionSort(int[] intArray) {
    for(int i=1; i<intArray.length; i++){
        int temp = intArray[i];
        int j;
        for(j=i-1; j>=0 && temp<intArray[j]; j--){
            intArray[j+1]=intArray[j];
        }
        intArray[j+1] = temp;
    }
}
ProgrammerDan
  • 871
  • 7
  • 17
LiozMe
  • 11
  • 2