30

I'm trying to use the Source.actorRef method to create an akka.stream.scaladsl.Source object. Something of the form

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

My question is: how do I send data to my ActorRef based Source object?

I assumed sending messages to the Source was something of the form

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

But weatherSource doesn't have a ! operator or tell method.

The documentation isn't too descriptive on how to use Source.actorRef, it just says you can...

Thank you in advance for your review and response.

Noah
  • 13,821
  • 4
  • 36
  • 45
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125

3 Answers3

25

You need a Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

Remember this is all experimental and may change!

Noah
  • 13,821
  • 4
  • 36
  • 45
  • In M5 it looks like Source.actorRef doesn't exist. Do you know where it moved to? – Ramón J Romero y Vigil Jun 11 '15 at 18:53
  • It looks like they basically changed this to passing a `Props` to source. The updated documentations is here http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html – Noah Jun 11 '15 at 19:09
  • 1.0-RC3 is the most recent version and `Source.actorRef` still lives at the same place there: http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/#akka.stream.scaladsl.Source$ – jrudolph Jun 12 '15 at 11:43
  • `Source(Props)` in M5, now `Source.actorPublisher` in RC3, is something else: it is used to create a Source that is backed by a custom `ActorPublisher` implementation. – jrudolph Jun 12 '15 at 11:47
  • hmm, I have a similar problem trying to get the underlying `ActorRef` but I need the ref before I can create my `Sink`. Can this `Flow` be thrown away and another `Flow` created? – fommil Jul 11 '15 at 21:54
  • I am creating my `Sink` with `Sink.create(subscriber)`. That returns a `Sink` that materializes as `Unit`. Is there a way to do that and still get the `ActorRef`? – Troy Daniels Sep 11 '15 at 17:54
  • Ignore my previous comment. I was doing `Source.from(...).runWith(Sink.create(sub, ec)`, which loses the source materialization. If you use `Flow.runWith`, then you get both. – Troy Daniels Sep 11 '15 at 18:09
8

As @Noah points out the experimental nature of akka-streams, his answer might not work with the 1.0 release. I had to follow the example given by this example:

implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
Thien
  • 672
  • 5
  • 12
8

Instance of ActorRef, like all 'materialized values', will become accessible only once whole stream is materialized, or, in other words, when RunnableGraph is being run.

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))

// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done:

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
  sunnySource.toMat(Sink.foreach(println))(Keep.both)

// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()

actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete { /* ... */ }