3

I have a bunch of xml files that I'm trying to process in parallel. My scala code (2.9.2) using future starts out fine but ends up eating up nearly 100% of the 32G I have on my machine. This doesn't happen when I do this sequentially so my guess is there's something wrong with garbage collection in using scala futures.

Here's a stripped down version of my code. Can anyone tell me what's wrong?

val filenameGroups = someStringListOfFilepaths.grouped(1000).toStream
val tasks = filenameGroups.map {
  fg =>
    scala.actors.Futures.future {
      val parser = new nu.xom.Builder() // I'm using nu.xom. Not sure it matters.
      fg.map {
        path => {
          val doc = parser.build(new java.io.File(path))
          val result = doc.query(some xpath query)
          result
        }
      }.toList
    }
}

val pairs = tasks.par.flatMap(_.apply)

ETA: Ok, I solved this but I still have no idea why this makes a difference.

I abstracted most of the code in the inner loops then reran it. And pulled out the parser instantiation from future. Memory usage now stays flat at a decent 17%. Does anybody have any idea why this would make a difference?

Here's a simplified version of what I did:

def process(arglist...) = yada

val tasks = filenameGroups.map {
  fg =>
    val parser = new nu.xom.Builder()
    scala.actors.Futures.future {
      process(fg, parser)
    }
}

val pairs = tasks.par.flatMap(_.apply)
JasonMond
  • 1,440
  • 1
  • 16
  • 30
  • How many files you want to process concurrently? It seems like you have at least thousands of them. Loading thousands of XML files into memory will quickly eat up your whole RAM. Doing this sequentially basically reads file, processes it and it's eligible for garbage collection. – Tomasz Nurkiewicz Oct 16 '12 at 19:02
  • @TomaszNurkiewicz I want to process as many as possible and I have 200K files. I just assumed scala futures would be smart and only create 8 (or however many processors you have) future instances so I only have 8 xml docs in memory at a time. – JasonMond Oct 16 '12 at 19:11

1 Answers1

2

Futures can't really predict how many threads you want or how much memory your computations will take, so it's generally your responsibility to put appropriately serialized computations inside a modest number of futures. In particular, if you are on an 8 core machine, you probably don't want to group much smaller than someStringListOfFilepaths.length/8 (less if your files are so large that you can't have 8 in memory at once). You can use the standard Java trick of inspecting the number of cores, covered on SO and many other places, if you want to scale it per machine without having to think about it. (Might want to inspect Runtime.getRuntime.maxMemory also in that case, just in case you're on a machine with lots of cores and not much RAM (or not much allocated for the VM).)

(Incidentally, in your minimal example there's both laziness and futures, but the laziness doesn't do anything for you. Futures are already not running when created, so delaying the instantiation of the futures probably doesn't help you any.)

Also, note that if you have 200k files, you will end up with 200k results, and depending on how large a result is, that could eat up a lot of memory. Probably not 32G, but who knows what's in the files?

Community
  • 1
  • 1
Rex Kerr
  • 166,841
  • 26
  • 322
  • 407
  • I've run this code sequentially and it never hits more than 5% memory use. Also, at your recommendation, I tried upping the group sizes to 30K. It still ends up hitting almost 100% memory use. – JasonMond Oct 16 '12 at 20:24
  • @JasonMond - What if you run all the files in one group? That is, what if it's sequential after all but just wrapped in a future? I wonder whether the parallel thing and futures is a red herring, and it's actually something about your fg.map with a grouped stream? – Rex Kerr Oct 16 '12 at 22:43
  • 1
    Isn't it that Stream is memoized and he keeps a reference to the head (tasks) – Viktor Klang Oct 17 '12 at 00:22
  • @ViktorKlang Yup, turns out it was because I was using stream. The input was actually an iterable so I converted it to stream to be able to use the parallel collection. Wrong idea. Changing the input iterable to a list fixed everything. – JasonMond Oct 17 '12 at 13:47