Given a function A => IO[B]
(aka Kleisli[IO, A, B]
) that is meant to be called multiple times, and has side effects, like updating a DB, how to delegate such multiple calls of it into a stream (I guess Pipe[IO, A, B]
) (fs2, monix observable/iterant)? Reason for this is to be able to accumulate state, batch calls together over a time window etc.
More concretely, http4s server requires a Request => IO[Response]
, so I am looking how to operate on streams (for the above benefits), but ultimately provide such a function to http4s.
I suspect it will need some correlation ID behind the scenes and I am fine with that, I am more interested in how to do it safely and properly from an FP perspective.
Ultimately, the signature I expect is probably something like:
Pipe[IO, A, B] => (A => IO[B])
, such that calls to Kleisli are piped through the pipe.
As an afterthought, would it be at all possible to backpressure?