6

I have written a Scala (2.9.1-1) application that needs to process several million rows from a database query. I am converting the ResultSet to a Stream using the technique shown in the answer to one of my previous questions:

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...)
}.toStream.foreach { record => ... }

and this has worked very well.

Since the body of the foreach closure is very CPU intensive, and as a testament to the practicality of functional programming, if I add a .par before the foreach, the closures get run in parallel with no other effort, except to make sure that the body of the closure is thread safe (it is written in a functional style with no mutable data except printing to a thread-safe log).

However, I am worried about memory consumption. Is the .par causing the entire result set to load in RAM, or does the parallel operation load only as many rows as it has active threads? I've allocated 4G to the JVM (64-bit with -Xmx4g) but in the future I will be running it on even more rows and worry that I'll eventually get an out-of-memory.

Is there a better pattern for doing this kind of parallel processing in a functional manner? I've been showing this application to my co-workers as an example of the value of functional programming and multi-core machines.

Community
  • 1
  • 1
Ralph
  • 31,584
  • 38
  • 145
  • 282
  • Just courious. What DBMS are you using, and what Scala DB API to query it? – santiagobasulto Mar 22 '12 at 11:58
  • I am accessing a Microsoft SQL Server 2012 database running on Windows Server 2008 R2 using the JDBC driver from Microsoft (http://msdn.microsoft.com/en-us/sqlserver/aa937724). – Ralph Mar 22 '12 at 11:59

2 Answers2

4

If you look at the scaladoc of Stream, you will notice that the definition class of par is the Parallelizable trait... and, if you look at the source code of this trait, you will notice that it takes each element from the original collection and put them into a combiner, thus, you will load each row into a ParSeq:

  def par: ParRepr = {
    val cb = parCombiner
    for (x <- seq) cb += x
    cb.result
  }

  /** The default `par` implementation uses the combiner provided by this method
   *  to create a new parallel collection.
   *
   *  @return  a combiner for the parallel collection of type `ParRepr`
   */
  protected[this] def parCombiner: Combiner[A, ParRepr]

A possible solution is to explicitly parallelize your computation, thanks to actors for example. You can take a look at this example from the akka documentation for example, that might be helpful in your context.

Nicolas
  • 24,509
  • 5
  • 60
  • 66
  • I was afraid of that. I thought of firing up a set of threads and then have each pull rows from the (synchronized) result set, but that doesn't sound like a very functional solution. – Ralph Mar 22 '12 at 13:19
  • Have an actor wrap the query and spawn up a Router with a Resizer that you instruct to pull in chunks. – Viktor Klang Mar 23 '12 at 15:03
-1

The new akka stream library is the fix you're looking for:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

def iterFromQuery() : Iterator[Record] = {
  val resultSet = statement.executeQuery(...)
  new Iterator[Record] {
    def hasNext = resultSet.next()
    def next = new Record(...)
  }
}

def cpuIntensiveFunction(record : Record) = {
...
}

implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val execContext = actorSystem.dispatcher

val poolSize = 10 //number of Records in memory at once

val stream = 
  Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction))

stream onComplete {_ => actorSystem.shutdown()}
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125