I'm trying to write an enumerator for reading files line by line from a java.io.BufferedReader
using Scalaz 7's iteratee library, which currently only provides an (extremely slow) enumerator for java.io.Reader
.
The problems I'm running into are related to the fact that all of the other iteratee libraries I've used (e.g. Play 2.0's and John Millikin's enumerator
for Haskell) have had an error state as one of their Step
type's constructors, and Scalaz 7 doesn't.
My current implementation
Here's what I currently have. First for some imports and IO
wrappers:
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
And an type alias to clean things up a bit:
type ErrorOr[A] = Either[Throwable, A]
And now a tryIO
helper, modeled (loosely, and probably wrongly) on the one in enumerator
:
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
action.catchLeft.map(
r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
)
)
An enumerator for the BufferedReader
itself:
def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
lazy val reader = r
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case Right(None) => s.pointI
case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
case Left(e) => k(I.elInput(Left(e)))
}
)
}
And finally an enumerator that's responsible for opening and closing the reader:
def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(openFile(f)) flatMap {
case Right(reader) => I.iterateeT(
enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
)
case Left(e) => k(I.elInput(Left(e)))
}
)
}
Now suppose for example that I want to collect all the lines in a file that contain at least twenty-five '0'
characters into a list. I can write:
val action: IO[ErrorOr[List[String]]] = (
I.consume[ErrorOr[String], IO, List] %=
I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
enumFile(new File("big.txt"))
).run.map(_.sequence)
In many ways this seems to work beautifully: I can kick the action off with unsafePerformIO
and it will chunk through tens of millions of lines and gigabytes of data in a couple of minutes, in constant memory and without blowing the stack, and then close the reader when it's done. If I give it the name of a file that doesn't exist, it will dutifully give me back the exception wrapped in a Left
, and enumBuffered
at least seems to behave appropriately if it hits an exception while reading.
Potential problems
I have some concerns about my implementation, though—particularly of tryIO
. For example, suppose I try to compose a few iteratees:
val it = for {
_ <- tryIO[Unit, Unit](IO(println("a")))
_ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
r <- tryIO[Unit, Unit](IO(println("b")))
} yield r
If I run this, I get the following:
scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())
If I try the same thing with enumerator
in GHCi, the result is more like what I'd expect:
...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !
I just don't see a way to get this behavior without an error state in the iteratee library itself.
My questions
I don't claim to be any kind of expert on iteratees, but I have used the various Haskell implementations in a few projects, feel like I more or less understand the fundamental concepts, and had coffee with Oleg once. I'm at a loss here, though. Is this a reasonable way to handle exceptions in the absence of an error state? Is there a way to implement tryIO
that would behave more like the enumerator
version? Is there some kind of time bomb waiting for me in the fact that my implementation behaves differently?