62

I would like to create a Source and later push elements on it, like in:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

What is the recommended way to do this?

Thanks!

ale64bit
  • 6,232
  • 3
  • 24
  • 44
  • 2
    This might be the same as: http://stackoverflow.com/questions/29072963/how-to-add-elements-to-source-dynamically/29077212#29077212 – cmbaxter Jun 21 '15 at 13:47
  • 1
    @cmbaxter Indeed. Although I was thinking more in a way to just feed the stream by sending messages to some actor, without instantiating or having a class for this actor. I believe that it's possible with the `Source.actorRef` functionality, as I see in the post you linked and here: http://stackoverflow.com/questions/30785011/accessing-the-underlying-actorref-of-an-akka-stream-source-created-by-source-act. Thanks a lot) – ale64bit Jun 21 '15 at 13:52
  • You may also find this answer helpful: https://stackoverflow.com/questions/40345697/how-to-use-akka-http-client-websocket-send-message/44605821#44605821 – dimart.sp Jun 17 '17 at 14:42

3 Answers3

103

There are three ways this can be achieved:

1. Post Materialization with SourceQueue

You can use Source.queue that materializes the Flow into a SourceQueue:

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)

2. Post Materialization with Actor

There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)

3. Pre Materialization with Actor

Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here:

object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • 3
    @Loic I did not get from your comment that "prematerialization with queue" would be a fourth possible solution. It is. I found this good: http://stackoverflow.com/questions/37113877/how-can-i-use-and-return-source-queue-to-caller-without-materializing-it/37117205#37117205 – akauppi Dec 21 '16 at 20:43
  • @akauppi In the link you posted if you `mapMaterializedValue` it will create another source. He use a Future to get the queue of the source he wants to return. – Guillaume Massé Feb 13 '17 at 10:20
  • Question: is that possible when I call queue.complete() , after the upstream emit to sink, I start to push to queue again ? – zt1983811 Jun 02 '17 at 16:03
  • 1
    @zt1983811 I have never attempted the use case you specified. – Ramón J Romero y Vigil Jun 02 '17 at 16:30
  • How can I do this https://stackoverflow.com/questions/44316740/akka-stream-sliding-window-to-control-reduce-emit-to-sink/44317588?noredirect=1#comment75647475_44317588 – zt1983811 Jun 03 '17 at 12:53
  • @akauppi Because I also use queue to emit the data to source but I can not do reduce once I use this. – zt1983811 Jun 04 '17 at 21:49
  • @akauppi I have create a test project in gitlab, and I put the detail explaination on the readme for this project, also create the test run you can run with spring boot. Please help me thanks so much https://gitlab.com/tongzhougit/Demo-akka-stream-reduce-question – zt1983811 Jun 06 '17 at 14:41
19

Since Akka 2.5 Source has a preMaterialize method.

According to the documentation, this looks like the indicated way to do what you ask:

There are situations in which you require a Source materialized value before the Source gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, like Source.queue, Source.actorRef or Source.maybe.

Below an example on how this would be with a SourceQueue. Elements are pushed to the queue before and after materialization, as well as from within the Flow:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()


val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()

// Adding element before actual materialization
sourceMat.offer("pre materialization element")

val flow = Flow[String].map { e =>
  if(!e.contains("new")) {
    // Adding elements from within the flow
    sourceMat.offer("new element generated inside the flow")
  }
  s"Processing $e"
}

// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()

// Adding element after materialization
sourceMat.offer("post materialization element")

Output:

Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow
PetrosP
  • 635
  • 6
  • 15
3

After playing around and looking for a good solution to this I came across this solution which is clean, simple, and works both pre and post materialization. https://stackoverflow.com/a/32553913/6791842

  val (ref: ActorRef, publisher: Publisher[Int]) =
    Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both).run()

  ref ! 1 //before

  val source = Source.fromPublisher(publisher)

  ref ! 2 //before
  Thread.sleep(1000)
  ref ! 3 //before

  source.runForeach(println)

  ref ! 4 //after
  Thread.sleep(1000)
  ref ! 5 //after

Output:

1
2
3
4
5