4

Given a function A => IO[B] (aka Kleisli[IO, A, B]) that is meant to be called multiple times, and has side effects, like updating a DB, how to delegate such multiple calls of it into a stream (I guess Pipe[IO, A, B]) (fs2, monix observable/iterant)? Reason for this is to be able to accumulate state, batch calls together over a time window etc.

More concretely, http4s server requires a Request => IO[Response], so I am looking how to operate on streams (for the above benefits), but ultimately provide such a function to http4s.

I suspect it will need some correlation ID behind the scenes and I am fine with that, I am more interested in how to do it safely and properly from an FP perspective.

Ultimately, the signature I expect is probably something like:

Pipe[IO, A, B] => (A => IO[B]), such that calls to Kleisli are piped through the pipe.

As an afterthought, would it be at all possible to backpressure?

V-Lamp
  • 1,630
  • 10
  • 18
  • If I understand correctly, you want to accumulate state between HTTP calls? Because I don't understand what it means to "batch calls together" when you're behind http4s? – Yuval Itzchakov Feb 25 '19 at 11:56
  • Yes, accumulating state is the general idea. For example, you may want to batch multiple `GET`s within 2 seconds that query the same table into one query using `in` clause, to increase throughput (at the cost of latency) – V-Lamp Feb 25 '19 at 12:20
  • Why does not `StateT`-like transformers fit? They exactly provide an ability of stateful computations. – Some Name Feb 25 '19 at 12:35
  • No, I thought of `StateT` but does not fit either the requirement to have a Kleisli with transparent state nor access to subsequent calls for windowing operations. – V-Lamp Feb 25 '19 at 13:03
  • I find it a bit odd that you'd accumulate state in order to batch requests for an HTTP service and delay each request because of that. Perhaps I'm not in the right context, but why would you do that? – Yuval Itzchakov Feb 25 '19 at 13:28
  • If it helps, akka http also models a server as a `BidiFlow` of request-response, there are various motivations to modelling it as such. – V-Lamp Feb 25 '19 at 13:39
  • Correct me if I'm wrong, but `Kleisli` has a signature `A => IO[B]`, but what you want is either `A => IO[A]` (chaining) or `Seq[A] => IO[Seq[B]]` (batching). Latter is only useful if the effect actually supports `Seq`'s. – Markus Appel Feb 27 '19 at 15:23

1 Answers1

1

One idea is to model it with MPSC (Multiple Publisher Single Consumer). I'll give an example with Monix since I'm more familiar with it, but the idea stays the same even if you use FS2.

object MPSC extends App {

  sealed trait Event
  object Event {
    // You'll need a promise in order to send the response back to user
    case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
  }

  // For backpressure, take a look at `PublishSubject`.
  val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)

  def pushEvent(num: Int) = {
    for {
      promise <- Deferred[Task, Int]
      _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
    } yield promise
  }

  // You get a list of events now since it is buffered
  // Monix has a lot of buffer strategies, check the docs for more details
  def processEvents(items: Seq[Event]): Task[Unit] = {
    Task.delay(println(s"Items: $items")) >>
      Task.traverse(items) {
        case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
      }.void
  }

  val app = for {
    // Start the stream in the background
    _ <- cs
      .bufferTimed(3.seconds) // Buffer all events within 3 seconds
      .filter(_.nonEmpty)
      .mapEval(processEvents)
      .completedL
      .startAndForget

    _ <- Task.sleep(1.second)
    p1 <- pushEvent(10)
    p2 <- pushEvent(20)
    p3 <- pushEvent(30)

    // Wait for the promise to complete, you'll do this for each request
    x <- p1.get
    y <- p2.get
    z <- p3.get

    _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
  } yield ()

  app.runSyncUnsafe()
}
atl
  • 326
  • 1
  • 9