2

I am new to Scala and would like to build a real-time application to match some people. For a given Person, I would like to get the TOP 50 people with the highest matching score.

The idiom is as follows :

val persons = new mutable.HashSet[Person]() // Collection of people
/* Feed omitted */
val personsPar = persons.par // Make it parall
val person = ... // The given person

res = personsPar
        .filter(...) // Some filters
        .map{p => (p,computeMatchingScoreAsFloat(person, p))}
        .toList
        .sortBy(-_._2)
        .take(50)
        .map(t => t._1 + "=" + t._2).mkString("\n")

In the sample code above, HashSet is used, but it can be any type of collection, as I am pretty sure it is not optimal

The problem is that persons contains over 5M elements, the computeMatchingScoreAsFloat méthods computes a kind a correlation value with 2 vectors of 200 floats. This computation takes ~2s on my computer with 6 cores.

My question is, what is the fastest way of doing this TOPN pattern in Scala ?

Subquestions : - What implementation of collection (or something else?) should I use ? - Should I use Futures ?

NOTE: It has to be computed in parallel, the pure computation of computeMatchingScoreAsFloat alone (with no ranking/TOP N) takes more than a second, and < 200 ms if multi-threaded on my computer

EDIT: Thanks to Guillaume, compute time has been decreased from 2s to 700ms

def top[B](n:Int,t: Traversable[B])(implicit ord: Ordering[B]):collection.mutable.PriorityQueue[B] = {

  val starter = collection.mutable.PriorityQueue[B]()(ord.reverse) // Need to reverse for us to capture the lowest (of the max) or the greatest (of the min)

  t.foldLeft(starter)(
    (myQueue,a) => {
      if( myQueue.length <= n ){ myQueue.enqueue(a);myQueue}
      else if( ord.compare(a,myQueue.head) < 0  ) myQueue
      else{
        myQueue.dequeue
        myQueue.enqueue(a)
        myQueue
      }
    }
  )
}

Thanks

ogen
  • 802
  • 2
  • 7
  • 23
  • Have you benchmarked this? Is computeMatchingScoreAsFloat the most expensive part? Is it heavy enough to be worth parallelizing? – Rüdiger Klaehn Nov 04 '16 at 16:21
  • Yes, It has to be computed in parallel, the pure computation of computeMatchingScoreAsFloat (with no ranking/TOP N) takes more than a second, and 40 ms if multi threaded on my computer – ogen Nov 07 '16 at 11:22

1 Answers1

4

I would propose a few changes:

1- I believe that the filter and map steps requires traversing the collection twice. Having a lazy collection would reduce it to one. Either have a lazy collection (like Stream) or converting it to one, for instance for a list:

myList.view

2- the sort step requires sorting all elements. Instead, you can do a FoldLeft with an accumulator storing the top N records. See there for an example of an implementation: Simplest way to get the top n elements of a Scala Iterable . I would probably test a Priority Queue instead of a list if you want maximum performance (really falling into its wheelhouse). For instance, something like this:

  def IntStream(n:Int):Stream[(Int,Int)] = if(n == 0) Stream.empty else (util.Random.nextInt,util.Random.nextInt) #:: IntStream(n-1)

  def top[B](n:Int,t: Traversable[B])(implicit ord: Ordering[B]):collection.mutable.PriorityQueue[B] = {

    val starter = collection.mutable.PriorityQueue[B]()(ord.reverse) // Need to reverse for us to capture the lowest (of the max) or the greatest (of the min)

    t.foldLeft(starter)(
      (myQueue,a) => {
        if( myQueue.length <= n ){ myQueue.enqueue(a);myQueue}
        else if( ord.compare(a,myQueue.head) < 0  ) myQueue
        else{
          myQueue.dequeue
          myQueue.enqueue(a)
          myQueue
        }
      }
    )
  }

def diff(t2:(Int,Int)) =  t2._2
 top(10,IntStream(10000))(Ordering.by(diff)) // select top 10 

I really think that you problem requires a SINGLE collection traverse so you be able to get your run time down to below 1 sec

Good luck!

Community
  • 1
  • 1
Guillaume
  • 1,277
  • 2
  • 13
  • 21
  • Thank for your help, I will test this and tell you back the new compute time – ogen Nov 04 '16 at 16:00
  • It seems that I can't have a view on a parallel collection (a collection that is both parallel and lazy), I can handle a ParSeq xor a SeqView. Do you know how I can achieve this ? Is there any ParSeqView in Scala ? – ogen Nov 07 '16 at 10:45
  • Have you tried running it on a non-parallel collection? I think it has parallel collections have significant overheads and may not be justified (plus, you have a "toList" which, I suspect, merge into a non-parallel collection). Otherwise, you can run a flatMap to combine filter and map (see here http://stackoverflow.com/questions/32234132/how-to-combine-filter-and-map-in-scala) that would effectively be the same result as a lazy collection – Guillaume Nov 07 '16 at 14:44
  • Thanks for your help. I have updated my original post. And Yes I have already test the computeMatchingScoreAsFloat computation alone, and it takes> 1s on a non-par collection and < 200 ms on a par collection – ogen Nov 07 '16 at 17:15
  • 2s -> 700 ms with your code. Any other advices ? I will try to manually split my data into buckets and run multi-threaded computations without any collection methods. – ogen Nov 08 '16 at 09:33
  • You need to take out the "toList". This is a slow step on a large collection. Try to run the "top" method on each parallel partition and merge those PriorityQueue into a single one. – Guillaume Nov 08 '16 at 13:13