1

My question is somewhere related to : Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef with some differences :

  1. I'm using akka-stream experimental 1.0
  2. I'm using the actorPublisher model
  3. I'm using a FlowGraph dsl for stream definition with parallel processing

I don't find a way to get an actorRef to send a message to the Actor Publisher instance held by the Source.

 def run(implicit system: ActorSystem) = {
   import system.dispatcher
   implicit val materializer = ActorMaterializer()

   val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event }

   //Implementation in subpackage
   val sinkLevel1 = Sinks.sinkLevel1 
   val sinkLevel2 = Sinks.sinkLevel2

   //Implementation in subpackage
   val stageTriage = FlowStages.stageTriage    
   val stageEvalProcess1 = FlowStages.stageEvalProcess1
   val stageEvalProcess2 = FlowStages.stageEvalProcess2

   val pipeline = FlowGraph.closed(){ implicit builder => 
     import FlowGraph.Implicits._

     val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2))

     source ~> stageTriage ~> stageDispatchByRuleLevels
                              stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1
                              stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2

   }

   pipeline.run()

 }

Thanks for help !

Oliver

Community
  • 1
  • 1
Oliver
  • 11
  • 2

1 Answers1

1

Based on Noah's answer in the linked question, if you add

val ref = pipeline.run()

you can then send messages to ref, like

ref ! ...
yardena
  • 21
  • 1