Questions tagged [fs2]

FS2: Functional Streams for Scala

FS2: Functional Streams for Scala - is a streaming I/O library. The design goals are compositionality, expressiveness, resource safety, and speed.

149 questions
19
votes
1 answer

Difference between flatMap, flatTap, evalMap and evalTap

In Scala fs2 library for functional streams: I am trying to understand the difference between flatMap, flatTap, evalMap and evalTap. They all seem to perform the same thing, which is transformation of the stream values. What is the difference and…
Lev Denisov
  • 2,011
  • 16
  • 26
13
votes
1 answer

How to reason about stack safety in Scala Cats / fs2?

Here is a piece of code from the documentation for fs2. The function go is recursive. The question is how do we know if it is stack safe and how to reason if any function is stack safe? import fs2._ // import fs2._ def tk[F[_],O](n: Long):…
Lev Denisov
  • 2,011
  • 16
  • 26
9
votes
1 answer

How do I "split" a stream in fs2?

I want to do something like this: def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = (stream, stream.map(split) But this does not work as it "pulls" from the source twice - once each when I drain both…
pathikrit
  • 32,469
  • 37
  • 142
  • 221
9
votes
2 answers

how to convert scala fs2 stream to string?

I'm want to know how to convert Scala fs2 Stream to string, from fs2 github readme example: def converter[F[_]](implicit F: Sync[F]): F[Unit] = { val path =…
LoranceChen
  • 2,453
  • 2
  • 22
  • 48
8
votes
2 answers

Stop the fs2-stream after a timeout

I want to use a function similar to take(n: Int) but in a time dimension: consume(period: Duration. So I want a stream to terminate if a timeout occurs. I know that I could compile a stream to something like IO[List[T]] and cancel it, but then I'll…
Mikhail Golubtsov
  • 6,285
  • 3
  • 29
  • 36
8
votes
3 answers

Pushing elements externally to a reactive stream in fs2

I have an external (that is, I cannot change it) Java API which looks like this: public interface Sender { void send(Event e); } I need to implement a Sender which accepts each event, transforms it to a JSON object, collects some number of them…
Vladimir Matveev
  • 120,085
  • 34
  • 287
  • 296
7
votes
0 answers

Which operations are easier to implement in pull vs push models in streaming libraries (and vice versa)?

Author of Monix says in comparison of Monix with FS2 Where FS2 is better: the model of communication between producers and consumers is pull-based, sometimes making it easier to implement new operators Where Monix is better: the model of…
visa
  • 300
  • 2
  • 10
7
votes
0 answers

FS2 - How to route an element to a specific nested stream/pipe?

I want to run N nested streams/pipes in parallel and send each element to only one of the nested streams. Balance allows me to do this but I want to route elements with the same "key" to the same nested stream or pipe. I can't see any functions to…
Toby Hobson
  • 199
  • 7
7
votes
1 answer

Grouping event with fs2.Stream

I have event stream as follows: sealed trait Event val eventStream: fs2.Stream[IO, Event] = //... I want to group this events received within a single minute (i.e from 0 sec to 59 sec of every minute). This sounds pretty straightforward with…
Some Name
  • 8,555
  • 5
  • 27
  • 77
6
votes
1 answer

Improving performance of fs2 stream involving file transformation

I've got something like this (it's an example from https://github.com/typelevel/fs2, with my additions, which I marked with comments): import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource} import fs2.{io, text, Stream} import…
6
votes
1 answer

How to group objects using a classifier function in FS2?

I have a stream of unordered measurements, that I'd like to group into batches of a fixed size, so that I can persist them efficiently later: val measurements = for { id <- Seq("foo", "bar", "baz") value <- 1 to 5 } yield (id,…
Szymon Jednac
  • 2,969
  • 1
  • 29
  • 43
6
votes
1 answer

How to shutdown a fs2.StreamApp programmatically?

Extending StreamApp asks you to provide the stream def. It has a requestShutdown parameter. def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] I provide the implementation for this and understand that args is passed in as…
Toby
  • 9,523
  • 8
  • 36
  • 59
6
votes
2 answers

Mocking a method which returns an fs2.Stream

Why isn't it possible to mock a method that returns an fs2.Stream with a Mockito mock? Mockito is complaining that I am trying to return a FreeC instead of a Stream. Why is that and how can I get it working? The following code: import…
Johan J
  • 101
  • 2
  • 4
5
votes
3 answers

Splitting the fs2 stream output to two files

I'm just starting my adventure with fs2 streams. What I want to achieve, is to read a file (a large one, this is why I use fs2), transform it and write the result to two different files (based on some predicate). Some code (from…
5
votes
2 answers

Why does merging with empty fs2.Stream change program's behavior

It's well documented that merging with an empty fs2.Stream should produce the same fs2.Stream. Here is the quote from Scaladocs: Has the property that merge(Stream.empty, s) == s Consider the following complete Scala program with…
Some Name
  • 8,555
  • 5
  • 27
  • 77
1
2 3
9 10