3

I am playing with Akka Streams and streaming content from a file using Alpakka. I need to stop the stream after some time so I want to use KillSwitch. But I don't know how to use it because I am using the graph DSL.

My graph looks like this:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._

  source ~> mainFlow ~> sink

  ClosedShape
})

graph.run()

I found a solution here: How to abruptly stop an akka stream Runnable Graph?

However, I don't know how to apply it if I'm using the graph DSL. Can you give me some advice?

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
martyn
  • 136
  • 1
  • 8

1 Answers1

5

To surface a materialized value in the GraphDSL, you can pass the stage that materialized to that value to the create method. It is easier explained with an example. In your case:

  val switch = KillSwitches.single[Int]

  val graph: RunnableGraph[UniqueKillSwitch] =
    RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
    import GraphDSL.Implicits._

    source ~> mainFlow ~> sw ~> sink

    ClosedShape
  })

  val ks = graph.run()
  ks.shutdown()
Stefano Bonetti
  • 8,973
  • 1
  • 25
  • 44
  • Ok, perfect :) I have one more question, do you have an idea how to stop a stream after process n elements? Now I am using system.scheduler.scheduleOnce(10.seconds) but better will be stop after processed defined count of element instead of time. – martyn Apr 22 '17 at 17:28
  • Don't need a kill switch for that, you can use `.take(n)` – Stefano Bonetti Apr 22 '17 at 17:31
  • I know, but I want to take n elements, stop the stream, call another method which does something else, after that call system.terminate(). I want to measure time how long takes to process for example 100 elements. – martyn Apr 22 '17 at 17:53
  • Still, no need for a KillSwitch. Use `take(n)`, and choose a sink that materializes to a `Future` at completion (e.g. Sink.ignore). Then you can attach any callback to the future you get back when you run the graph. – Stefano Bonetti Apr 22 '17 at 20:34
  • Sorry, but I still don't understand how to do it. As a sink I am using a file and it looks like that: Sink[String, Future[IOResult]] , how can attach callback to it? Can you please write simple sample code? Thanks – martyn Apr 23 '17 at 09:35
  • Don't mean to be pedantic, but this has evolved to a different issue from what it originally was. Could you please create another question stating clearly the problem you are trying to solve? – Stefano Bonetti Apr 23 '17 at 17:10
  • Yes, sure no problem. I did as you proposed – martyn Apr 23 '17 at 21:30