6

I have an array of objects I want to sort, where the predicate for sorting is asynchronous. Does Scala have either a standard or 3rd party library function for sorting based on a predicate with type signature of (T, T) -> Future[Bool] rather than just (T, T) -> Bool?

Alternatively, is there some other way I could structure this code? I've considered finding all the 2-pair permutations of list elements, running the predicate over each pair and storing the result in a Map((T, T), Bool) or some structure to that effect, and then sorting on it - but I suspect that will have many more comparisons executed than even a naive sorting algorithm would.

jackweirdy
  • 5,632
  • 4
  • 24
  • 33

2 Answers2

1

If your predicate is async you may prefer to get an async result too and avoid blocking threads using Await

If you want to sort a List[(T,T)] according to a future boolean predicate, the easiest it to sort a List[(T,T,Boolean)]

So given a you have a List[(T,T)] and a predicate (T, T) -> Future[Bool], how can you get a List[(T,T,Boolean)]? Or rather a Future[List[(T,T,Boolean)]] as you want to keep the async behavior.

val list: List[(T,T)] = ...
val predicate = ...
val listOfFutures: List[Future[(T,T,Boolean]] = list.map { tuple2 => 
  predicate(tuple2).map( bool => (tuple2._1, tuple2._2, bool) 
}
val futureList: Future[List[(T,T,Boolean)]] = Future.sequence(listOfFutures)
val futureSortedResult: Future[List[(T,T)]] = futureList.map { list =>
    list.sort(_._3).map(tuple3 => (tuple3._1,tuple3._2))
}

This is pseudo-code, I didn't compile it and it may not, but you get the idea.

The key is Future.sequence, very useful, which somehow permits to transform Monad1[Monad2[X]] to Monad2[Monad1[X]] but notice that if any of your predicate future fail, the global sort operation will also be a failure.


If you want better performance it may be a better solution to "batch" the call to the service returning the Future[Boolean]. For example instead of (T, T) -> Future[Bool] maybe you can design a service (if you own it obviously) like List[(T, T)] -> Future[List[(T,T,Bool)] so that you can get everything you need in a async single call.

Sebastien Lorber
  • 89,644
  • 67
  • 288
  • 419
0

A not so satisfactory alternative would be to block each comparison until the future is evaluated. If evaluating your sorting predicate is expensive, sorting will take a long time. In fact, this just translates a possibly concurrent program into a sequential one; all benefits of using futures will be lost.

import scala.concurrent.duration._
implicit val executionContext = ExecutionContext.Implicits.global

val sortingPredicate: (Int, Int) => Future[Boolean] = (a, b) => Future{
  Thread.sleep(20) // Assume this is a costly comparison
  a < b
}

val unsorted = List(4, 2, 1, 5, 7, 3, 6, 8, 3, 12, 1, 3, 2, 1)
val sorted = unsorted.sortWith((a, b) =>
  Await.result(sortingPredicate(a, b), 5000.millis) // careful: May throw an exception
)
println(sorted) // List(1, 1, 1, 2, 2, 3, 3, 3, 4, 5, 6, 7, 8, 12)

I don't know if there is an out of the box solution that utilizes asynchronous comparison. However, you could try to implement your own sorting algorithm. If we consider Quicksort, which runs in O(n log(n)) on average, then we can actually utilize asynchronous comparison quite easy.

If you're not familiar with Quicksort, the algorithm basically does the following

  1. Choose an element from the collection (called the Pivot)
  2. Compare the pivot with all remaining elements. Create a collection with elements that are less than the pivot and one with elements that are greater than the pivot.
  3. Sort the two new collections and concatenate them, putting the pivot in the middle.

Since step 2 performs a lot of independent comparisons we can evaluate the comparisons concurrently.

Here's an unoptimized implementation:

object ParallelSort {
  val timeout = Duration.Inf

  implicit class QuickSort[U](elements: Seq[U]) {
    private def choosePivot: (U, Seq[U]) = elements.head -> elements.tail

    def sortParallelWith(predicate: (U, U) => Future[Boolean]): Seq[U] =
      if (elements.isEmpty || elements.size == 1) elements
      else if (elements.size == 2) {
        if (Await.result(predicate(elements.head, elements.tail.head), timeout)) elements else elements.reverse
      }
      else {
        val (pivot, other) = choosePivot
        val ordering: Seq[(Future[Boolean], U)] = other map { element => predicate(element, pivot) -> element }

        // This is where we utilize asynchronous evaluation of the sorting predicate
        val (left, right) = ordering.partition { case (lessThanPivot, _) => Await.result(lessThanPivot, timeout) }

        val leftSorted = left.map(_._2).sortParallelWith(predicate)
        val rightSorted = right.map(_._2).sortParallelWith(predicate)
        leftSorted ++ (pivot +: rightSorted)
      }
  }

}

which can be used (same example as above) as follows:

import ParallelSort.QuickSort
val sorted2 = unsorted.sortParallelWith(sortingPredicate)
println(sorted2) // List(1, 1, 1, 2, 2, 3, 3, 3, 4, 5, 6, 7, 8, 12)

Note that whether or not this implementation of Quicksort is faster or slower than the completely sequential built-in sorting algorithm highly depends on the cost of a comparison: The longer a comparison has to block, the worse is the alternative solution mentioned above. On my machine, given a costly comparison (20 milliseconds) and the above list, the built-in sorting algorithm runs in ~1200 ms while this custom Quicksort runs in ~200 ms. If you're worried about performance, you'd probably want to come up with something smarter. Edit: I just checked how many comparisons both, the built-in sorting algorithm and the custom Quicksort algorithm perform: Apparently, for the given list (and some other lists I randomly typed in) the built-in algorithm uses more comparisons, so the performance improvements thanks to parallel execution might not be that great. I don't know about bigger lists, but you'd have to profile them on your specific data anyways.

Kulu Limpa
  • 3,501
  • 1
  • 21
  • 31