5

I am not able to figure out how to stop akka stream Runnable Graph immediately ? How to use killswitch to achieve this? It has been just a few days that I started akka streams. In my case I am reading lines from a file and doing some operations in flow and writing to the sink. What I want to do is, stop reading file immediately whenever I want, and I hope this should possibly stop the whole running graph. Any ideas on this would be greatly appreciated.

Thanks in advance.

PainPoints
  • 461
  • 8
  • 20

2 Answers2

2

Since Akka Streams 2.4.3, there is an elegant way to stop the stream from the outside via KillSwitch.

Consider the following example, which stops stream after 10 seconds.

object ExampleStopStream extends App {

  implicit val system = ActorSystem("streams")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  val source = Source.
    fromIterator(() => Iterator.continually(Random.nextInt(100))).
    delay(500.millis, DelayOverflowStrategy.dropHead)
  val square = Flow[Int].map(x => x * x)
  val sink = Sink.foreach(println)

  val (killSwitch, done) =
    source.via(square).
    viaMat(KillSwitches.single)(Keep.right).
    toMat(sink)(Keep.both).run()

  system.scheduler.scheduleOnce(10.seconds) {
    println("Shutting down...")
    killSwitch.shutdown()
  }

  done.foreach { _ =>
    println("I'm done")
    Await.result(system.terminate(), 1.seconds)
  }

}
lpiepiora
  • 13,659
  • 1
  • 35
  • 47
  • See the example code below: How to stop the following graph, and cancel reading the large file? RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ // Source from file val A: Outlet[String] = builder.add(readFileLineByLine).out //write each lines to file val E: Inlet[String] = builder.add(writeLinesToFile).in A ~> E ClosedShape }).run() – PainPoints Aug 06 '16 at 08:28
  • You are providing example such that you know when to stop the system before you start, but in my case, I need to stop the running system any time I want. – PainPoints Aug 06 '16 at 08:39
  • I see, from the akka stream test I go tthis working: val switch1 = KillSwitches.shared("switch") val downstream = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ // Source from file val A: Outlet[String] = builder.add(readFileLineByLine).out //write each lines to file val E: Inlet[String] = builder.add(writeLinesToFile).in A.via(switch1.flow) ~> E ClosedShape }).run() switch1.shutdown() – PainPoints Aug 06 '16 at 09:49
1

The one way have a service or shutdownhookup which can call graph cancellable

val graph=
    Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println))
  val cancellable=graph.run()

  cancellable.cancel

The cancellable.cancel can be part of ActorSystem.registerOnTermination

ASe
  • 535
  • 5
  • 15