2

I've this application using Akka Streams and ReactiveMongo. There are no user defined actors. The application is launched from a main method.

Problem is the JVM continues to run forever after the main method has completed. This is what I'm doing now:

val g = (file: String) => RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) {
  implicit builder =>
    sink =>
      import GraphDSL.Implicits._

      // Source
      val A: Outlet[(String, String)] = builder.add(Source.fromIterator(() => parseMovies(file).iterator)).out
      // Flow
      val B: FlowShape[(String, String), Either[String, Movie]] = builder.add(findMovie)
      // Flow
      val C: FlowShape[Either[String, Movie], Option[String]] = builder.add(persistMovie)

      A ~> B ~> C ~> sink.in

      ClosedShape
})

def main(args: Array[String]): Unit = {
  require(args.size >= 1, "Path to file is required.")

  g(args(0)).run
    .onComplete(_ => Await.result(system.terminate(), 5.seconds))
}

I've read this thread, and this, none of which work. system.shutdown is deprecated and I don't have any explicit actors to watch for. I can call system.exit but that's hardly graceful.

From the logs, it appears that Akka is trying to shut down but then I see a bunch of Mongo messages.

2017-01-13 11:35:57.320 [DEBUG] a.e.EventStream.$anonfun$applyOrElse$4 - shutting down: StandardOutLogger started
2017-01-13 11:36:05.397 [DEBUG] r.c.a.MongoDBSystem.debug - [Supervisor-1/Connection-2] ConnectAll Job running... Status: {{NodeSet None Node[localhost:27017: Primary (10/10 available connections), latency=6], auth=Set() }}
2017-01-13 11:36:05.420 [DEBUG] r.c.a.MongoDBSystem.debug - [Supervisor-1/Connection-2] RefreshAll Job running... Status: {{NodeSet None Node[localhost:27017: Primary (10/10 available connections), latency=6], auth=Set() }}
// more of MongoDBSystem.debug messages

Why won't it.just.die?

Community
  • 1
  • 1
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • 2
    You should close the `MongoDriver` properly, as it's managing its own underlying Actor System, that you should never (cannot) access directly – cchantep Jan 13 '17 at 16:53
  • @cchantep Can you be more specific or show a code example? Closing the Mongo connection didn't help. – Abhijit Sarkar Jan 13 '17 at 19:11
  • If you carefully read I mentioned `MongoDriver`, not `MongoConnection` – cchantep Jan 13 '17 at 19:30
  • @cchantep I understood you the first time but was asking for a code sample. Anyway, I made the following change and it does work. If you post an answer (instead of a comment), I'll accept it. `g(args(0)).run .onComplete(_ => { driver.close(5.seconds) Await.result(system.terminate(), 5.seconds) })` – Abhijit Sarkar Jan 13 '17 at 19:45

1 Answers1

0

I think you want to add a shutdown hook or call actorSystem.registerOnTermination(driver.close()):

def main(args: Array[String]): Unit = {
  import akka.actor.CoordinatedShutdown
  require(args.size >= 1, "Path to file is required.")

  CoordinatedShutdown(system).addTask(CooordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdownMongoDriver") { () => driver.close(5.seconds); Future.successful(Done) }

  g(args(0)).run.onComplete(_ => CoordinatedShutdown(system).run())    
}
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
Will Sargent
  • 4,346
  • 1
  • 31
  • 53