6

Extending StreamApp asks you to provide the stream def. It has a requestShutdown parameter.

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

I provide the implementation for this and understand that args is passed in as command line arguments. I'm unsure however, what supplies the requestShutdown parameter and what I can do with it.

Specifically, I'd like to invoke a graceful shutdown on a Stream[IO, ExitCode] which is starting a Http4s server (which blocks forever).

It looks like a Signal is needed and must be set? The underlying stream that I'm trying to 'get at' looks like this:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

My stream def is here and StreamAppSpec from the fs2 project has something in the StreamAppSpec but I can't work out how I'd adapt it.

karlson
  • 5,325
  • 3
  • 30
  • 62
Toby
  • 9,523
  • 8
  • 36
  • 59

1 Answers1

5

You can think of the requestShutdown parameter that is supplied to the stream function as meaning an action that, when executed, will request the termination of the program.

Executing it will consequently result in it ending the program.

Here is an example use:

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      scheduler <- Scheduler[IO](corePoolSize = 1)
      exitStream = scheduler.sleep[IO](10 seconds)
       .evalMap(_ => requestShutdown)
       .map(_ => ExitCode.Success)
      serverStream = BlazeBuilder[IO]
        .bindHttp(port, "0.0.0.0")
        .mountService(services(scheduler), "/")
        .serve
      result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
    } yield result

In this scenario, we create two streams:

  • The first will wait for 10 seconds before triggering the effect of
    terminating the app.

  • The second will run the http4s server.

We then join these two streams so that they run concurrently meaning that the web server will run for 10 seconds before the other stream signals that the program should terminate.

TheInnerLight
  • 12,034
  • 1
  • 29
  • 52
  • Thanks. Any suggestions for hooking something up so tests for example can shutdown a server? Pre-`StreamApp` I was doing this manually with a countdown latch so I don't feel I'm getting much from moving to `StreamApp`... – Toby Mar 16 '18 at 20:00
  • @Toby I mean, the most basic way to force it would be to simply call `requestShutdown.unsafeRunSync()` to invoke the side effect at the appropriate moment. It's not what I would suggest in production code but it's a simple way of testing it and seeing how it works. – TheInnerLight Mar 16 '18 at 20:10
  • 1
    @Toby In a more sophisticated approach, you could easily move the `exitStream` (or something similar) outside of the `StreamApp` and then join that stream with the `stream` from `StreamApp` so that you don't pollute your real server code with test shutdown logic. – TheInnerLight Mar 16 '18 at 20:16
  • I'll have a go and report back if I come up with anything interesting – Toby Mar 16 '18 at 20:53
  • 1
    I did something. It's not very interesting! https://github.com/tobyweston/temperature-machine/blob/6803e3bf4af7c7099d72d1671a3936cd7f84183e/src/test/scala/bad/robot/temperature/server/HttpServerTest.scala – Toby Mar 28 '18 at 14:22