6

Let's take an example from some scalaz-stream docs, but with a theoretical twist.

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run

In this case the file might very well contain some non-double string, and the fahrenheitToCelsius will throw some NumberFormatException. Let's say that in this case we want to maybe log this error and ignore it for further stream processing. What's the idiomatic way of doing it? I've seen some examples, but they usually collectFrom the stream.

kareblak
  • 412
  • 1
  • 3
  • 12
  • Probably not so idiomatic for Scalaz, but you can use `Try` for mapping and in case it failed, log the error the way you want (probably this way: https://github.com/scalaz/scalaz-stream/blob/master/src/test/scala/scalaz/stream/examples/WritingAndLogging.scala#L63). – Gábor Bakos Dec 18 '14 at 15:32

1 Answers1

3

You can do it with scalaz.\/ and additional processing steps

  def fahrenheitToCelsius(line: String): Throwable \/ String = 
     \/.fromTryCatchNonFatal {
        val fahrenheit = line.toDouble
        val celsius = fahrenheit // replace with real convert
        celsius.toString
     }

  def collectSome[T]: PartialFunction[Option[T], T] = {
    case Some(v) => v
  }

  def logThrowable[T]: PartialFunction[Throwable \/ T, Option[T]] = {
    case -\/(err) => 
      err.printStackTrace()
      None
    case \/-(v) => Some(v)
  }

  val converter: Task[Unit] =
    io.linesR("testdata/fahrenheit.txt")
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(fahrenheitToCelsius)
      .map(logThrowable)
      .collect(collectSome)
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("testdata/celsius.txt"))
      .run
Eugene Zhulenev
  • 9,714
  • 2
  • 30
  • 40
  • Ok, so there's no run/attemptRun executor or a generic handler that can be specified to act on certain Exceptions? Also, I think the `.map(collectCome)` should be a `.collect(collectSome)` in this case. – kareblak Dec 19 '14 at 08:31
  • by default process will fail on first exception, but a I understand you want to log them all, so you need to catch exceptions on each step that can fail. yeah, it's a typo, is should be collect(collectSome) – Eugene Zhulenev Dec 19 '14 at 13:52
  • Yes, very handy for IO reads, for which scalaz-streams pretty much are made for, especially if you don't have any control over the data read and the quality of the data. It's a stream - in some cases you just want to throw away the corrupt stuff. But the answer is indeed covering. – kareblak Dec 19 '14 at 17:31