5

I'm just starting my adventure with fs2 streams. What I want to achieve, is to read a file (a large one, this is why I use fs2), transform it and write the result to two different files (based on some predicate). Some code (from https://github.com/typelevel/fs2), with my comment:

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      /* instead of the last line I want something like this:
      .through(<write temperatures higher than 10 to one file, the rest to the other one>)
      */
  }

What is the most efficient way to do so? The obvious solution is to have two streams with different filters, but it's inefficient (there will be two passes).

  • Perhaps write a function that takes a line and de-multiplexes it to the appropriate file. Then call that function from a `.through()` at the end? – jq170727 Oct 03 '20 at 19:33
  • I'll try with this approach. Perhaps two `through`s or `observe` will be another solution. – Dawid Łakomy Oct 03 '20 at 20:48
  • 1
    There is a `broadcastThrough` function. could be used to broadcast one stream to multi `pipe` s – jilen Jan 15 '21 at 02:50

3 Answers3

3

Unfortunately, as far as I know, there's no easy way to split fs2 stream into two.

What you could do, is splitting your stream by pushing values to one of two queues (1st for value under 10, 2nd for values over or equal 10). If we use NoneTerminatedQueue then queues will be not terminated until we put None into them. Then we can just use dequeue to create separate streams until queues are not closed.

Example solution below. I split writting to file and reading into separate methods:

import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}

object FahrenheitToCelsius extends IOApp {

  def fahrenheitToCelsius(f: Double): Double =
    (f - 32.0) * (5.0 / 9.0)

  //I split reading into separate method
  def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble))
    .evalMap { value =>
      if (value > 10) { //here we put values to one of queues
        over.enqueue1(Some(value)) //until we put some queues are not close
      } else {
        under.enqueue1(Some(value))
      }
    }
    .onFinalize(
      over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
    )

  //function write takes as argument source queue and target file
  def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
    s.map(_.toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get(fileName), blocker))
  }

  val converter: Stream[IO, Unit] = for {
    over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
    under <- Stream.eval(Queue.noneTerminated[IO, Double])
    blocker <- Stream.resource(Blocker[IO])
    _ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
      .concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
      .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
  } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    converter
      .compile
      .drain
      .as(ExitCode.Success)

}
Krzysztof Atłasik
  • 21,985
  • 6
  • 54
  • 76
  • Many thanks. I'll keep trying with another approach for some time (`.through(someGoodPipe)` at the end). I've read such and idea on http://www.beyondthelines.net/programming/streaming-patterns-with-fs2/, but in Akka, if I understand correctly. I'll accept your answer if I won't succeed or get another answer using this idea. – Dawid Łakomy Oct 05 '20 at 19:31
  • 3
    If you will be able to come up with another working solution please remember to post here as an answer. It might be very helpful for others. – Krzysztof Atłasik Oct 05 '20 at 19:56
  • Are you sure that this `.spawn` is needed? I think it works better without it. With `.spawn` the program tends to stop prematurely. – Dawid Łakomy Dec 03 '20 at 19:16
  • Yeah, you're right, I probably left it by mistake. It's not needed since concurrently already runs stream in other thread. I updated my answer. Thanks for remark. – Krzysztof Atłasik Dec 03 '20 at 21:14
3

It's also possible to use broadcastThrough, which allows broadcasting all elements of a stream to multiple Pipes.

A full solution to your problem could look like this (using cats effect 3.3.8 and fs2 3.2.5. That's why it looks a bit different but the main idea is the same regardless of the versions):

import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}

object Converter extends IOApp.Simple {

  val converter: Stream[IO, Unit] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0 / 9.0)

    def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
      _.filter(predicate)
        .map(_.toString)
        .through(text.utf8.encode)
        .through(Files[IO].writeAll(filename))

    Files[IO].readAll(Path("testdata/fahrenheit.txt"))
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .broadcastThrough(
        saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
        saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
      )
  }

  def run: IO[Unit] =
    converter.compile.drain
}

saveFiltered is now a function returning Pipe that is built using a filename and a Predicate. This function is used to build two arguments for broadcastThrough. I tested it for a small example and FWIW it works as expected.

broadcastThrough guarantees that all elements from the stream are sent to all pipes. There's one little caveat that's mentioned in the Scaladoc: the slowest pipe will cause the whole stream to slow down. I don't think this is a problem in this particular case because I'd guess that both pipes are equally fast.


You could even go a step further and generalize the idea a little bit:

def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
  _.broadcastThrough[F, B](
    _.filter(predicate).through(in),
    _.filter(a => !predicate(a)).through(out)
  )

With that you don't have to make sure that the two predicates produce results that are mutually exclusive.

With a slightly adapted saveFiltered:

def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
  _.map(_.toString)
    .through(text.utf8.encode)
    .through(Files[IO].writeAll(filename))

the last part of the stream is a bit shorter:

...
.through(
  partition(n => n >= 0,
    saveFiltered2(Path("testdata/celsius_over.txt")),
    saveFiltered2(Path("testdata/celsius_below.txt"))))```
l7r7
  • 1,134
  • 1
  • 7
  • 23
1

I've managed to find another solution. Here it is:

import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
      val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)

      Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
    }

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
      .through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

I think it's a bit easier to understand than the answer involving queues (queues appear to be a common solution to similar cases, though).