2

This is a follow-up to my previous question.

Suppose I process my files in parallel. Now I would like to write the processing results to a file. Since the results do not fit in memory I cannot just wait until the processing of all files finish and then write the results. I have to do the processing and writing in parallel somehow.

For example: Suppose I have files with numbers. The file size is about 500M. The number of files is about 200. Each file fits in memory but all of them do not fit. Now I would like to write all even numbers found in these files to another file.

How to do that in Scala (with Futures, and Scala parallel collections)?

Community
  • 1
  • 1
Michael
  • 10,185
  • 12
  • 59
  • 110
  • Lines() in scalax.io is is lazily evaluated Also have a look at future-exec http://jesseeichar.github.com/scala-io-doc/0.4.0/index.html#!/core/future_exec – oluies Jul 21 '12 at 07:01

2 Answers2

5

At some point you have to synchronize the writing. If you don't want to block the other threads one possibility is to use an actor to write the results to a file. This could look like this:

class FileWriterActor(path: String) extends Actor {

  val file = ... // init FileWriter

  // this is how you implement an akka actor
  // plain scala actors look a bit different        
  def receive = {
    case x: MyResult => file.write(x.toString)
  }

  override def postStop() = file.close()
}

// usage
val result = ... // calculation stuff
fileWriter ! result
schmmd
  • 18,650
  • 16
  • 58
  • 102
drexin
  • 24,225
  • 4
  • 67
  • 81
  • 1
    This is a good strategy. What might not be entirely clear here is where the threads are. Firstly, the actor represents one thread of activity: it continually receives messages on its input queue and writes them in sequence to the file. Secondly, all the client threads use the "fileWriter ! value" sequence to send messages to the (single) actor. The '!' operator is borrowed from Hoare's CSP algebra and so is the concept. As an alternative, it would be possible to use CSP directly, e.g. via JCSP, and in this manner the way this actor works would be more explicit. – Rick-777 Jul 22 '12 at 09:13
  • In an application of mine using parallel collections to perform NLP on each line of a large input file, I switch from a synchronized println to using an Actor to write the data. CPU usage boosted from 180% to 700% and time-per-100k lines went from 12 s to 2.5 s. – schmmd May 10 '13 at 18:05
1

For those not familiar with akka:

import java.io.{File, PrintWriter}
import akka.actor.{Actor,ActorSystem,Props}

object AkkaWriterExample extends App{

  val outputPath : String = ???
  val system = ActorSystem("WriterSystem")
  val writer = system.actorOf(Props(new WriterActor(new File(outputPath))), name="writer")
  writer ! "this is a test"
  system.shutdown()
  system.awaitTermination()
}

class WriterActor(outFile: File) extends Actor {

  val writer = new PrintWriter(outFile)

  // this is how you implement an akka actor
  // plain scala actors look a bit different        
  def receive = {
    case str:String => println(str); writer.write(str);
  }

  override def postStop() = {
    writer.flush(); 
    writer.close();
  }
}
Thomas Luechtefeld
  • 1,316
  • 15
  • 23