106

Background

As noted in this question, I'm using Scalaz 7 iteratees to process a large (i.e., unbounded) stream of data in constant heap space.

My code looks like this:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

The Problem

I seem to have run into a memory leak, but I'm not familiar enough with Scalaz/FP to know whether the bug is in Scalaz or in my code. Intuitively, I expect this code to require only (on the order of) P times the Chunk-size space.

Note: I found a similar question in which an OutOfMemoryError was encountered, but my code is not using consume.

Testing

I ran some tests to try and isolate the problem. To summarize, the leak only appears to arise when both zipWithIndex and group are used.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Code for the tests:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Questions

  • Is the bug in my code?
  • How can I make this work in constant heap space?
Community
  • 1
  • 1
Aaron Novstrup
  • 20,967
  • 7
  • 70
  • 108
  • 6
    I ended up reporting this as [an issue in Scalaz](https://github.com/scalaz/scalaz/issues/554). – Aaron Novstrup Oct 03 '13 at 21:35
  • 1
    It won't be any fun, but you could try `-XX:+HeapDumpOnOutOfMemoryError` and analyzing the dump with eclipse MAT http://www.eclipse.org/mat/ to see what line of code is holding on to the arrays. – huynhjl Oct 09 '13 at 06:56
  • 10
    @huynhjl FWIW, I tried analyzing the heap with both JProfiler and MAT but was completely unable to wade through all the references to anonymous function classes, etc. Scala really needs dedicated tools for this sort of thing. – Aaron Novstrup Oct 10 '13 at 19:29
  • What if there is no leak and it is just that what you are doing requires a wildly increasing amount of memory? You can easily replicate the zipWithIndex without that particular FP construct by just maintaining a `var` counter as you go. – Ezekiel Victor Oct 14 '14 at 09:40
  • @EzekielVictor I'm not sure I understand the comment. You're suggesting that adding a single `Long` index per chunk would change the algorithm from constant- to non-constant heap space? The non-zipping version clearly uses constant heap space, because it can "process" as many chunks as you're willing to wait for. – Aaron Novstrup Oct 14 '14 at 20:50
  • Sorry, yes, "wildly increasing" implies change of complexity WRT heap space. I should have just said "compounding." Definitely won't change heap space from constant to non-constant, but it will of course cause more consumption for all the extra structs needed. You're already using such a huge amount of data/iterations that maybe it would be better to opt out of FP approach here and just maintain a running Long var. You've already proved to yourself a group *without* the zip keeps you within mem limits. :) – Ezekiel Victor Oct 16 '14 at 06:25
  • I realize that's a workaround of sorts (though not the least bit hacky; I mean that "var" keyword exists for a reason) but I guess what I'm saying is I don't see anything implying a memory leak. Profiling would have to be done to prove that the memory consumed exceeds the expected amount for the extra structs you require when using zipWithIndex. – Ezekiel Victor Oct 16 '14 at 06:28

1 Answers1

4

This will come as little consolation for anyone who's stuck with the older iteratee API, but I recently verified that an equivalent test passes against the scalaz-stream API. This is a newer stream processing API that is intended to replace iteratee.

For completeness, here's the test code:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

This should work with any value for the n parameter (provided you're willing to wait long enough) -- I tested with 2^14 32MiB arrays (i.e., a total of half a TiB of memory allocated over time).

Aaron Novstrup
  • 20,967
  • 7
  • 70
  • 108