0

I have been trying to implement parallel merge sort in Scala. But with 8 cores, using .sorted is still about twice as fast.

edit:

I rewrote most of the code to minimize object creation. Now it runs about as fast as the .sorted

Input file with 1.2M integers:

  • 1.333580 seconds (my implementation)
  • 1.439293 seconds (.sorted)

How should I parallelize this?

New implementation

object Mergesort extends App
{

//=====================================================================================================================
// UTILITY
  implicit object comp extends Ordering[Any] {
    def compare(a: Any, b: Any) = {
      (a, b) match {
        case (a: Int, b: Int)       => a compare b
        case (a: String, b: String) => a compare b
        case _                      => 0
      }
    }
  }

//=====================================================================================================================
// MERGESORT

  val THRESHOLD = 30

  def inssort[A](a: Array[A], left: Int, right: Int): Array[A] = {
    for (i <- (left+1) until right) {
      var j = i
      val item = a(j)
      while (j > left && comp.lt(item,a(j-1))) {
        a(j) = a(j-1)
        j -= 1
      }
      a(j) = item
    }
    a
  }

  def mergesort_merge[A](a: Array[A], temp: Array[A], left: Int, right: Int, mid: Int) : Array[A] = {
    var i = left
    var j = right
    while (i < mid) { temp(i) = a(i);   i+=1;       }
    while (j > mid) { temp(i) = a(j-1); i+=1; j-=1; }

    i = left
    j = right-1
    var k = left
    while (k < right) {
      if (comp.lt(temp(i), temp(j))) { a(k) = temp(i); i+=1; k+=1; }
      else                           { a(k) = temp(j); j-=1; k+=1; }
    }
    a
  }

  def mergesort_split[A](a: Array[A], temp: Array[A], left: Int, right: Int): Array[A] = {
    if (right-left == 1) a

    if ((right-left) > THRESHOLD) {
      val mid = (left+right)/2
      mergesort_split(a, temp, left, mid)
      mergesort_split(a, temp, mid, right)
      mergesort_merge(a, temp, left, right, mid)
    }
    else
      inssort(a, left, right)
  }

  def mergesort[A: ClassTag](a: Array[A]): Array[A] = {
    val temp = new Array[A](a.size)
    mergesort_split(a, temp, 0, a.size)
  }

Previous implementation

Input file with 1.2M integers:

  • 4.269937 seconds (my implementation)
  • 1.831767 seconds (.sorted)

What sort of tricks there are to make it faster and cleaner?

object Mergesort extends App
{

//=====================================================================================================================
// UTILITY

  val StartNano = System.nanoTime
  def dbg(msg: String) = println("%05d DBG ".format(((System.nanoTime - StartNano)/1e6).toInt) + msg)
  def time[T](work: =>T) = {
    val start = System.nanoTime
    val res = work
    println("%f seconds".format((System.nanoTime - start)/1e9))
    res
  }

  implicit object comp extends Ordering[Any] {
    def compare(a: Any, b: Any) = {
      (a, b) match {
        case (a: Int, b: Int)       => a compare b
        case (a: String, b: String) => a compare b
        case _                      => 0
      }
    }
  }

//=====================================================================================================================
// MERGESORT

  def merge[A](left: List[A], right: List[A]): Stream[A] = (left, right) match {
    case (x :: xs, y :: ys) if comp.lteq(x, y) => x #:: merge(xs, right)
    case (x :: xs, y :: ys) => y #:: merge(left, ys)
    case _ => if (left.isEmpty) right.toStream else left.toStream
  }

  def sort[A](input: List[A], length: Int): List[A] = {
    if (length < 100) return input.sortWith(comp.lt)
    input match {
      case Nil | List(_) => input
      case _ =>
        val middle = length / 2
        val (left, right) = input splitAt middle
        merge(sort(left, middle), sort(right, middle + length%2)).toList
    }
  }

  def msort[A](input: List[A]): List[A] = sort(input, input.length)

//=====================================================================================================================
// PARALLELIZATION

  //val cores = Runtime.getRuntime.availableProcessors
  //dbg("Detected %d cores.".format(cores))
  //lazy implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(cores))

  def futuremerge[A](fa: Future[List[A]], fb: Future[List[A]])(implicit order: Ordering[A], ec: ExecutionContext) =
  {
    for {
      a <- fa
      b <- fb
    } yield merge(a, b).toList
  }

  def parallel_msort[A](input: List[A], length: Int)(implicit order: Ordering[A]): Future[List[A]] = {
    val middle = length / 2
    val (left, right) = input splitAt middle

    if(length > 500) {
      val fl = parallel_msort(left, middle)
      val fr = parallel_msort(right, middle + length%2)
      futuremerge(fl, fr)
    }
    else {
      Future(msort(input))
    }
  }

//=====================================================================================================================
// MAIN

  val results = time({
    val src = Source.fromFile("in.txt").getLines
    val header = src.next.split(" ").toVector
    val lines = if (header(0) == "i") src.map(_.toInt).toList else src.toList
    val f = parallel_msort(lines, lines.length)
    Await.result(f, concurrent.duration.Duration.Inf)
  })

  println("Sorted as comparison...")
  val sorted_src = Source.fromFile(input_folder+"in.txt").getLines
  sorted_src.next
  time(sorted_src.toList.sorted)

  val writer = new PrintWriter("out.txt", "UTF-8")
  try writer.print(results.mkString("\n"))
  finally writer.close
}
warbaque
  • 583
  • 1
  • 8
  • 18
  • How big is your data set? How many times did you execute? Since you are performance testing, are you doing multiple runs so that the JVM has time optimize and warmup code? Have you tried running with `scala.testing.Benchmark`? – noahlz Oct 23 '15 at 23:00
  • I've been testing mostly with input files containing from few hundred thousand lines up to few million. (example run above was with 1.2M) I've been just doing single runs for now since there's still such a big discrepancy between my implementation and `.sorted` running times and multiple runs wouldn't change that much. – warbaque Oct 23 '15 at 23:43
  • If you're running in a JVM, you need multiple runs just to overcome thread start-up time. – noahlz Oct 24 '15 at 00:03
  • See this question and responses: http://stackoverflow.com/q/504103/7507 – noahlz Oct 24 '15 at 00:05
  • Is .sorted running in parallel? Is it possible that .sorted bypasses the JVM? Seems that parallel sort would mostly help when the data fits in each core's cache. Once it goes beyond that, then all the cores are competing for the same memory bus. – rcgldr Oct 24 '15 at 00:43
  • as far as I know `.sorted` is single threaded (looking at @tkachuko's answer) – warbaque Oct 24 '15 at 19:06

1 Answers1

2

My answer is probably going to be a bit long, but i hope that it will be useful for both you and me.

So, first question is: "how scala is doing sorting for a List?" Let's have a look at the code from scala repo!

  def sorted[B >: A](implicit ord: Ordering[B]): Repr = {
    val len = this.length
    val b = newBuilder
    if (len == 1) b ++= this
    else if (len > 1) {
      b.sizeHint(len)
      val arr = new Array[AnyRef](len)  // Previously used ArraySeq for more compact but slower code
      var i = 0
      for (x <- this) {
        arr(i) = x.asInstanceOf[AnyRef]
        i += 1
      }
      java.util.Arrays.sort(arr, ord.asInstanceOf[Ordering[Object]])
      i = 0
      while (i < arr.length) {
        b += arr(i).asInstanceOf[A]
        i += 1
      }
    }
    b.result()
  }

So what the hell is going on here? Long story short: with java. Everything else is just size justification and casting. Basically this is the line which defines it:

java.util.Arrays.sort(arr, ord.asInstanceOf[Ordering[Object]])

Let's go one level deeper into JDK sources:

public static <T> void sort(T[] a, Comparator<? super T> c) {
    if (c == null) {
        sort(a);
    } else {
        if (LegacyMergeSort.userRequested)
            legacyMergeSort(a, c);
        else
            TimSort.sort(a, 0, a.length, c, null, 0, 0);
    }
}

legacyMergeSort is nothing but single threaded implementation of merge sort algorithm.

The next question is: "what is TimSort.sort and when do we use it?"

To my best knowledge default value for this property is false, which leads us to TimSort.sort algorithm. Description can be found here. Why is it better? Less comparisons that in merge sort according to comments in JDK sources.

Moreover you should be aware that it is all single threaded, so no parallelization here.

Third question, "your code":

  1. You create too many objects. When it comes to performance, mutation (sadly) is your friend.
  2. Premature optimization is the root of all evil -- Donald Knuth. Before making any optimizations (like parallelism), try to implement single threaded version and compare the results.
  3. Use something like JMH to test performance of your code.
  4. You should not probably use Stream class if you want to have the best performance as it does additional caching.

I intentionally did not give you answer like "super-fast merge sort in scala can be found here", but just some tips for you to apply to your code and coding practices.

Hope it will help you.

tkachuko
  • 1,956
  • 1
  • 13
  • 20
  • 1) how should I do this in scala? 2) there's is already a single threaded version, and I don't know how to make it faster anymore – warbaque Oct 24 '15 at 19:04
  • 1) Your code looks like java now. Please have a look at https://courses.cs.washington.edu/courses/cse373/13wi/lectures/03-13/MergeSort.java. Threads should be replaced with Futures. – tkachuko Oct 25 '15 at 00:57
  • Thanks, I'll look into that. (I couldn't figure out right scala aproach, so I just wrote mergesort in c++ and then translated that into scala.) – warbaque Oct 25 '15 at 01:04
  • Also, please have a look at this conversation http://stackoverflow.com/questions/2201472/merge-sort-from-programming-scala-causes-stack-overflow and http://stackoverflow.com/questions/18944524/fast-functional-merge-sort. Here guys are discussing why it is slow and what can be done to improve it. Previous URL (washington.edu) should give you an idea how to parallelize it. – tkachuko Oct 25 '15 at 01:16
  • I implemented alternative mergesort using that piece of java as a base. Single threaded performance is slower than mine, but it's easier to parallelize. https://bpaste.net/show/f93cf489ca08 I still don't know what would be the correct way to use scala futures. Currently I just fork,wait and merge – warbaque Oct 25 '15 at 02:45