It's also possible to use broadcastThrough
, which allows broadcasting all elements of a stream to multiple Pipes.
A full solution to your problem could look like this (using cats effect 3.3.8 and fs2 3.2.5. That's why it looks a bit different but the main idea is the same regardless of the versions):
import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}
object Converter extends IOApp.Simple {
val converter: Stream[IO, Unit] = {
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
_.filter(predicate)
.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
Files[IO].readAll(Path("testdata/fahrenheit.txt"))
.through(text.utf8.decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble))
.broadcastThrough(
saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
)
}
def run: IO[Unit] =
converter.compile.drain
}
saveFiltered
is now a function returning Pipe
that is built using a filename and a Predicate. This function is used to build two arguments for broadcastThrough
. I tested it for a small example and FWIW it works as expected.
broadcastThrough
guarantees that all elements from the stream are sent to all pipes. There's one little caveat that's mentioned in the Scaladoc: the slowest pipe will cause the whole stream to slow down. I don't think this is a problem in this particular case because I'd guess that both pipes are equally fast.
You could even go a step further and generalize the idea a little bit:
def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
_.broadcastThrough[F, B](
_.filter(predicate).through(in),
_.filter(a => !predicate(a)).through(out)
)
With that you don't have to make sure that the two predicates produce results that are mutually exclusive.
With a slightly adapted saveFiltered
:
def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
_.map(_.toString)
.through(text.utf8.encode)
.through(Files[IO].writeAll(filename))
the last part of the stream is a bit shorter:
...
.through(
partition(n => n >= 0,
saveFiltered2(Path("testdata/celsius_over.txt")),
saveFiltered2(Path("testdata/celsius_below.txt"))))```