Without knowing more about the implementation here are my suggestions and/or comments:
Limit the number of threads that can run at any give time. Pergaps 8 or 10 (perhaps to give the scheduler more leeway, although it would best to put one per core/hw thread). There isn't really a point running more threads for "throughput" on a CPU-bound problem if the affinity doesn't support it.
Don't thread near the leaves!!! Only thread on the larger branches. There is no point spawning a thread to sort a relatively few number of items and at this level there are many many little branches! Threading would add much more relative overhead here. (This is similar to switching to a "simple sort" for the leaves).
Make sure each thread can work in isolation -- should not stomp on another thread during work -> no locks, just wait for join. Divide and conquer.
Possibly look at performing a "breadth-first" approach to spawn threads.
Consider a mergesort over a quicksort (I am biased towards mergesort :-) Remember there are numerous different kinds of mergesorts including bottom-up.
Edit
- Make sure it actually works. Remember to correctly utilize memory barriers between threads -- required even if no two threads modify the same data at once to ensure the correct visibility.
Edit (proof-of-concept):
I threw together this simple demonstration. On my Intel Core2 Duo @ 2Ghz I could get it run in about 2/3 to 3/4 the time which is definitely some improvement :) (Settings: DATA_SIZE = 3000000, MAX_THREADS = 4, MIN_PARALLEL = 1000). This is with the basic in-place quicksort code ripped from Wikipedia that does not take advantage of any other basic optimizations.
The method in which it determines if a new thread can/should start is also very primitive -- if no new thread is available it just chugs right along (because, you know, why wait?)
This code should also (hopefully) fan-out breadth-wise with the threads. This may be less efficient for data-locality than keeping it depth-wise, but the model seemed simple enough if my head.
An executor service is also used to simplify the design and to be able to reuse the same threads (vs. spawning new threads). MIN_PARALLEL can become quite small (say, about 20) before the executor overhead starts to show -- the maximum number of threads and only-using-a-new-thread-if-possible likely keeps this in check as well.
qsort average seconds: 0.6290541056
pqsort average seconds: 0.4513915392
I make absolutely no guarantees about the usefulness or correctness of this code but it "seems to work" here. Pay heed to the warning next to the ThreadPoolExecutor as it clearly shows I'm not entirely certain what is happening :-) I am fairly certain the design is somewhat flawed in under-utilizing the threads.
package psq;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
public class Main {
int[] genData (int len) {
Random r = new Random();
int[] newData = new int[len];
for (int i = 0; i < newData.length; i++) {
newData[i] = r.nextInt();
}
return newData;
}
boolean check (int[] arr) {
if (arr.length == 0) {
return true;
}
int lastValue = arr[0];
for (int i = 1; i < arr.length; i++) {
//System.out.println(arr[i]);
if (arr[i] < lastValue) {
return false;
}
lastValue = arr[i];
}
return true;
}
int partition (int[] arr, int left, int right, int pivotIndex) {
// pivotValue := array[pivotIndex]
int pivotValue = arr[pivotIndex];
{
// swap array[pivotIndex] and array[right] // Move pivot to end
int t = arr[pivotIndex];
arr[pivotIndex] = arr[right];
arr[right] = t;
}
// storeIndex := left
int storeIndex = left;
// for i from left to right - 1 // left ≤ i < right
for (int i = left; i < right; i++) {
//if array[i] ≤ pivotValue
if (arr[i] <= pivotValue) {
//swap array[i] and array[storeIndex]
//storeIndex := storeIndex + 1
int t = arr[i];
arr[i] = arr[storeIndex];
arr[storeIndex] = t;
storeIndex++;
}
}
{
// swap array[storeIndex] and array[right] // Move pivot to its final place
int t = arr[storeIndex];
arr[storeIndex] = arr[right];
arr[right] = t;
}
// return storeIndex
return storeIndex;
}
void quicksort (int[] arr, int left, int right) {
// if right > left
if (right > left) {
// select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
int pivotIndex = left + (right - left) / 2;
// pivotNewIndex := partition(array, left, right, pivotIndex)
int pivotNewIndex = partition(arr, left, right, pivotIndex);
// quicksort(array, left, pivotNewIndex - 1)
// quicksort(array, pivotNewIndex + 1, right)
quicksort(arr, left, pivotNewIndex - 1);
quicksort(arr, pivotNewIndex + 1, right);
}
}
static int DATA_SIZE = 3000000;
static int MAX_THREADS = 4;
static int MIN_PARALLEL = 1000;
// NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE
// That is, because it's possible to OVER SUBMIT with this code,
// even with the semaphores!
ThreadPoolExecutor tp = new ThreadPoolExecutor(
MAX_THREADS,
MAX_THREADS,
Long.MAX_VALUE,
TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
// if there are no semaphore available then then we just continue
// processing from the same thread and "deal with it"
Semaphore sem = new Semaphore(MAX_THREADS, false);
class QuickSortAction implements Runnable {
int[] arr;
int left;
int right;
public QuickSortAction (int[] arr, int left, int right) {
this.arr = arr;
this.left = left;
this.right = right;
}
public void run () {
try {
//System.out.println(">>[" + left + "|" + right + "]");
pquicksort(arr, left, right);
//System.out.println("<<[" + left + "|" + right + "]");
} catch (Exception ex) {
// I got nothing for this
throw new RuntimeException(ex);
}
}
}
// pquicksort
// threads will [hopefully] fan-out "breadth-wise"
// this is because it's likely that the 2nd executer (if needed)
// will be submitted prior to the 1st running and starting its own executors
// of course this behavior is not terribly well-define
void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException {
if (right > left) {
// memory barrier -- pquicksort is called from different threads
synchronized (arr) {}
int pivotIndex = left + (right - left) / 2;
int pivotNewIndex = partition(arr, left, right, pivotIndex);
Future<?> f1 = null;
Future<?> f2 = null;
if ((pivotNewIndex - 1) - left > MIN_PARALLEL) {
if (sem.tryAcquire()) {
f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1));
} else {
pquicksort(arr, left, pivotNewIndex - 1);
}
} else {
quicksort(arr, left, pivotNewIndex - 1);
}
if (right - (pivotNewIndex + 1) > MIN_PARALLEL) {
if (sem.tryAcquire()) {
f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right));
} else {
pquicksort(arr, pivotNewIndex + 1, right);
}
} else {
quicksort(arr, pivotNewIndex + 1, right);
}
// join back up
if (f1 != null) {
f1.get();
sem.release();
}
if (f2 != null) {
f2.get();
sem.release();
}
}
}
long qsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
quicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("qsort not sorted!");
}
return duration;
}
long pqsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
pquicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("pqsort not sorted!");
}
return duration;
}
public Main () throws Exception {
long qsort_duration = 0;
long pqsort_duration = 0;
int ITERATIONS = 10;
for (int i = 0; i < ITERATIONS; i++) {
System.out.println("Iteration# " + i);
int[] data = genData(DATA_SIZE);
if ((i & 1) == 0) {
qsort_duration += qsort_call(data);
pqsort_duration += pqsort_call(data);
} else {
pqsort_duration += pqsort_call(data);
qsort_duration += qsort_call(data);
}
}
System.out.println("====");
System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
}
public static void main(String[] args) throws Exception {
new Main();
}
}
YMMV. Happy coding.
(Also, I'd like to know how this -- or similar -- code fairs on your 8-core box. Wikipedia claims a linear speedup by number-of-cpus is possible :)
Edit (better numbers)
Removed use of futures which caused a minor "jam" and switched to a single final-wait-semaphore: less useless waiting. Now runs in just 55% of non-threaded time :-)
qsort average seconds: 0.5999702528
pqsort average seconds: 0.3346969088
(
package psq;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
public class Main {
int[] genData (int len) {
Random r = new Random();
int[] newData = new int[len];
for (int i = 0; i < newData.length; i++) {
newData[i] = r.nextInt();
}
return newData;
}
boolean check (int[] arr) {
if (arr.length == 0) {
return true;
}
int lastValue = arr[0];
for (int i = 1; i < arr.length; i++) {
//System.out.println(arr[i]);
if (arr[i] < lastValue) {
return false;
}
lastValue = arr[i];
}
return true;
}
int partition (int[] arr, int left, int right, int pivotIndex) {
// pivotValue := array[pivotIndex]
int pivotValue = arr[pivotIndex];
{
// swap array[pivotIndex] and array[right] // Move pivot to end
int t = arr[pivotIndex];
arr[pivotIndex] = arr[right];
arr[right] = t;
}
// storeIndex := left
int storeIndex = left;
// for i from left to right - 1 // left ≤ i < right
for (int i = left; i < right; i++) {
//if array[i] ≤ pivotValue
if (arr[i] <= pivotValue) {
//swap array[i] and array[storeIndex]
//storeIndex := storeIndex + 1
int t = arr[i];
arr[i] = arr[storeIndex];
arr[storeIndex] = t;
storeIndex++;
}
}
{
// swap array[storeIndex] and array[right] // Move pivot to its final place
int t = arr[storeIndex];
arr[storeIndex] = arr[right];
arr[right] = t;
}
// return storeIndex
return storeIndex;
}
void quicksort (int[] arr, int left, int right) {
// if right > left
if (right > left) {
// select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
int pivotIndex = left + (right - left) / 2;
// pivotNewIndex := partition(array, left, right, pivotIndex)
int pivotNewIndex = partition(arr, left, right, pivotIndex);
// quicksort(array, left, pivotNewIndex - 1)
// quicksort(array, pivotNewIndex + 1, right)
quicksort(arr, left, pivotNewIndex - 1);
quicksort(arr, pivotNewIndex + 1, right);
}
}
static int DATA_SIZE = 3000000;
static int MAX_EXTRA_THREADS = 7;
static int MIN_PARALLEL = 500;
// To get to reducePermits
@SuppressWarnings("serial")
class Semaphore2 extends Semaphore {
public Semaphore2(int permits, boolean fair) {
super(permits, fair);
}
public void removePermit() {
super.reducePermits(1);
}
}
class QuickSortAction implements Runnable {
final int[] arr;
final int left;
final int right;
final SortState ss;
public QuickSortAction (int[] arr, int left, int right, SortState ss) {
this.arr = arr;
this.left = left;
this.right = right;
this.ss = ss;
}
public void run () {
try {
//System.out.println(">>[" + left + "|" + right + "]");
pquicksort(arr, left, right, ss);
//System.out.println("<<[" + left + "|" + right + "]");
ss.limit.release();
ss.countdown.release();
} catch (Exception ex) {
// I got nothing for this
throw new RuntimeException(ex);
}
}
}
class SortState {
final public ThreadPoolExecutor pool = new ThreadPoolExecutor(
MAX_EXTRA_THREADS,
MAX_EXTRA_THREADS,
Long.MAX_VALUE,
TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
// actual limit: executor may actually still have "active" things to process
final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false);
final public Semaphore2 countdown = new Semaphore2(1, false);
}
void pquicksort (int[] arr) throws Exception {
SortState ss = new SortState();
pquicksort(arr, 0, arr.length - 1, ss);
ss.countdown.acquire();
}
// pquicksort
// threads "fork" if available.
void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException {
if (right > left) {
// memory barrier -- pquicksort is called from different threads
// and those threads may be created because they are in an executor
synchronized (arr) {}
int pivotIndex = left + (right - left) / 2;
int pivotNewIndex = partition(arr, left, right, pivotIndex);
{
int newRight = pivotNewIndex - 1;
if (newRight - left > MIN_PARALLEL) {
if (ss.limit.tryAcquire()) {
ss.countdown.removePermit();
ss.pool.submit(new QuickSortAction(arr, left, newRight, ss));
} else {
pquicksort(arr, left, newRight, ss);
}
} else {
quicksort(arr, left, newRight);
}
}
{
int newLeft = pivotNewIndex + 1;
if (right - newLeft > MIN_PARALLEL) {
if (ss.limit.tryAcquire()) {
ss.countdown.removePermit();
ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss));
} else {
pquicksort(arr, newLeft, right, ss);
}
} else {
quicksort(arr, newLeft, right);
}
}
}
}
long qsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
quicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("qsort not sorted!");
}
return duration;
}
long pqsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
pquicksort(data);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("pqsort not sorted!");
}
return duration;
}
public Main () throws Exception {
long qsort_duration = 0;
long pqsort_duration = 0;
int ITERATIONS = 10;
for (int i = 0; i < ITERATIONS; i++) {
System.out.println("Iteration# " + i);
int[] data = genData(DATA_SIZE);
if ((i & 1) == 0) {
qsort_duration += qsort_call(data);
pqsort_duration += pqsort_call(data);
} else {
pqsort_duration += pqsort_call(data);
qsort_duration += qsort_call(data);
}
}
System.out.println("====");
System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
}
public static void main(String[] args) throws Exception {
new Main();
}
}