6

I've been using Scalaz 7 iteratees to process a large (i.e., unbounded) stream of data in constant heap space.

In code, it looks something like this:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, List[Result]] =
  Iteratee.fold[Chunk, ErrorOr, List[Result]](Nil) { (rs, c) =>
    processChunk(c) :: rs
  } &= data

Now I'd like to perform the processing in parallel, working on P chunks of data at a time. I still have to limit heap space, but it's reasonable to assume that there's enough heap to store P chunks of data and the accumulated results of the computation.

I'm aware of the Task class and thought of mapping over the enumerator to create a stream of tasks:

data map (c => Task.delay(processChunk(c)))

But I'm still not sure how to manage the non-determinism. While consuming the stream, how do I ensure that P tasks are running whenever possible?

First try:

My first stab at a solution was to fold over the stream and create a Scala Future to process each chunk. However, the program blew up with a GC overhead error (presumably because it was pulling all the chunks into memory as it tried to create all the Futures). Instead, the iteratee needs to stop consuming input when there are already P tasks running and resume again when any of the those tasks finish.

Second try:

My next attempt was to group the stream into P-sized parts, process each part in parallel, then join before moving on to the next part:

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, Vector[Result]] =
  Iteratee.foldM[Vector[Chunk], ErrorOr, Vector[Result]](Nil) { (rs, cs) =>
    tryIO(IO(rs ++ Await.result(
      Future.traverse(cs) { 
        c => Future(processChunk(c)) 
      }, 
      Duration.Inf)))
  } &= (data mapE Iteratee.group(P))

While this wouldn't fully utilize the available processors (especially since the time required to process each Chunk may vary widely), it would be an improvement. However, the group enumeratee seems to leak memory -- heap usage suddenly goes through the roof.

Community
  • 1
  • 1
Aaron Novstrup
  • 20,967
  • 7
  • 70
  • 108
  • FYI, play iteratees (which are based off scala futures) had memory leaks as well. This situation [was fixed recently](https://github.com/scala/scala/pull/2674) – om-nom-nom Oct 02 '13 at 11:20
  • @om-nom-nom The memory leak seems to be unrelated to the use of Scala Futures. See [this related question](http://stackoverflow.com/questions/19128856/avoiding-memory-leaks-with-scalaz-7-iteratees). – Aaron Novstrup Oct 02 '13 at 18:35
  • actually wanted to post comment to that question :-) Just saying that scalaz futures and scala futures could have the same flaw conceptually – om-nom-nom Oct 02 '13 at 19:16

0 Answers0