35

I know about the parallel collections in Scala. They are handy! However, I would like to iterate over the lines of a file that is too large for memory in parallel. I could create threads and set up a lock over a Scanner, for example, but it would be great if I could run code such as:

Source.fromFile(path).getLines.par foreach { line =>

Unfortunately, however

error: value par is not a member of Iterator[String]

What is the easiest way to accomplish some parallelism here? For now, I will read in somes lines and handle them in parallel.

schmmd
  • 18,650
  • 16
  • 58
  • 102
  • You could create an array of actors and dispatch each line as a message to one of the actors. That way at least you don't have to mess with locking. – Kim Stebel Jul 19 '11 at 17:54
  • @kim But does this block, so the file doesn't spam the mailboxes? – ziggystar Jul 19 '11 at 19:43
  • 1
    It does not block, so the mailboxes would get spammed. However, the actors could explicitly request work from the actor that reads the file. – Kim Stebel Jul 19 '11 at 20:50
  • We ended up writing [a custom solution](https://github.com/allenai/common/blob/1b3d7d6e3788578c2d991ab97b32d38549ba38c2/core/src/main/scala/org/allenai/common/ParIterator.scala) at our company so we would understand the parallelism exactly. – schmmd Mar 13 '15 at 20:28

5 Answers5

33

You could use grouping to easily slice the iterator into chunks you can load into memory and then process in parallel.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}

In my opinion, something like this is the simplest way to do it.

Joshua Hartman
  • 1,216
  • 1
  • 11
  • 18
  • Won't calling `grouped` on `getLines` still have to load the whole file into memory? Or am I way off base? – Ian McLaird Jul 20 '11 at 04:33
  • 4
    It shouldn't because operations on iterators are usually as lazy as possible. For instance, map will produce a new iterator from the original without looping through the underlying collection. To verify, I actually looked at the latest scala source. Grouping the iterator produces a GroupedIterator class that does some internal buffering of the stream to group it into chunks. It does not force traversal of the entire iterator. – Joshua Hartman Jul 20 '11 at 15:29
10

I'll put this as a separate answer since it's fundamentally different from my last one (and it actually works)

Here's an outline for a solution using actors, which is basically what Kim Stebel's comment describes. There are two actor classes, a single FileReader actor that reads individual lines from the file on demand, and several Worker actors. The workers all send requests for lines to the reader, and process lines in parallel as they are read from the file.

I'm using Akka actors here but using another implementation is basically the same idea.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

This way, no more than 4 (or however many workers you have) unprocessed lines are in memory at a time.

Dan Simon
  • 12,891
  • 3
  • 49
  • 55
2

Below helped me to achieve

source.getLines.toStream.par.foreach( line => println(line))
Mahesh Pujari
  • 493
  • 1
  • 9
  • 15
0

I realize this is an old question, but you may find the ParIterator implementation in the iterata library to be a useful no-assembly-required implementation of this:

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
ms-tg
  • 2,688
  • 23
  • 18
0

The comments on Dan Simon's answer got me thinking. Why don't we try wrapping the Source in a Stream:

def src(source: Source) = Stream[String] = {
  if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
  else Stream.empty
}

Then you could consume it in parallel like this:

src(Source.fromFile(path)).par foreach process

I tried this out, and it compiles and runs at any rate. I'm not honestly sure if it's loading the whole file into memory or not, but I don't think it is.

Ian McLaird
  • 5,507
  • 2
  • 22
  • 31
  • 2
    According to [this](http://daily-scala.blogspot.com/2010/01/introducing-streams.html) blog post, the elements of a Stream are saved in memory after being evaluated, so even though lines are processed as they're read in, it will ultimately fill up memory as before, but I think you're on the right track here. I think combining this with Joshua Hartman's chunking idea and using `Stream.drop` will work. – Dan Simon Jul 20 '11 at 05:15
  • 1
    In addition to that, calling `par` will evaluate all the elements and create a strict parallel collection. There is no such collection as a parallel stream (at least not yet). – axel22 Jul 20 '11 at 11:25