How to implement a parallel algorithm on ParArray that may prove more
efficient than casting to a non parallel collection for sequential
sorting?
My first obvervation would be that there doesn't seem to be much performance penalty for "converting" parallel arrays to sequential and back:
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
val diff: Long = t1 - t0
println(s"Elapsed time: ${diff * 1.0 / 1E9}s")
result
}
def main(args: Array[String]): Unit = {
val size: Int = args.headOption.map(_.toInt).getOrElse(1000000)
val input = Array.fill(size)(Random.nextInt())
val arrayCopy: Array[Int] = Array.ofDim(size)
input.copyToArray(arrayCopy)
time { input.sorted }
val parArray = arrayCopy.par
val result = time { parArray.seq.sorted.toArray.par }
}
gives
> run 1000000
[info] Running Runner 1000000
Elapsed time: 0.344659236s
Elapsed time: 0.321363896s
For all Array
sizes I tested the results are very similar and usually somehow in favor of the second expression. So in case you were worried that converting to sequential collections and back will kill the performance gains you achieved on other operations - I don't think you should be.
When it comes to utilizing Scala's parallel collections to achieve parallel sorting that in some cases would perform better than the default - I don't think there's an obvious good way of doing that, but it wouldn't hurt to try:
What I thought should work would be splitting the input array into as many subarrays as you have cores in your computer (preferably without any unnecessary copying) and sorting the parts concurrently. Afterwards one might merge (as in merge sort) the parts together. Here's how the code might look like:
val maxThreads = 8 //for simplicity we're not configuring the thread pool explicitly
val groupSize:Int = size/maxThreads + 1
val ranges: IndexedSeq[(Int, Int)] = (0 until maxThreads).map(i => (i * groupSize, (i + 1) * groupSize))
time {
//parallelizing sorting for each range
ranges.par.foreach {case (from, to) =>
input.view(from, to).sortWith(_ < _)
}
//TODO merge the parts together
}
Unfortunately there's this old bug that prevents us from doing anything fun with views. There doesn't seem to be any Scala built-in mechanism (other than views) for sorting just a part of a collection. This is why I tried coding my own merge sort algorithm with the signature of def mergeSort(a: Array[Int], r: Range): Unit
to use it as I described above. Unfortunately it seems to be more than 4 times less effective than the scala Array.sorted
method so I don't think it could be used to gain efficiency over the standard sequential approach.
If I understand your situation correctly, your dataset fits in memory, so using something like Hadoop and MapReduce would be premature. What you might try though would be Apache Spark - other than adding a dependency you wouldn't need to set up any cluster or install anything for Spark to use all cores of your machine in a basic configuration. Its RDD's are ideologically similar to Scala's Parallel Collections, but with additional functionalities. And they (in a way) support parallel sorting.