25

I have example code to generate an unbound source and working with it:

object Main {

 def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }

}

I want to create class which implements:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}

And I need use it with multiple threads, for example:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will be infinite loop
        mySources.addToSource(i.toString)
    }
  }
} 

And expected full code:

object Main {
  def main(args : Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) {
      (new MyThread(sources)).start()
    }

    val source = sources.getSource()

    source.runForeach((item:String) => { println(item) })
    .onComplete{ _ => system.shutdown() }
  }
}

How to implement MySources?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
krynio
  • 2,442
  • 26
  • 30

3 Answers3

21

One way to have a non-finite source is to use a special kind of actor as the source, one that mixes in the ActorPublisher trait. If you create one of those kinds of actors, and then wrap with a call to ActorPublisher.apply, you end up with a Reactive Streams Publisher instance and with that, you can use an apply from Source to generate a Source from it. After that, you just need to make sure your ActorPublisher class properly handles the Reactive Streams protocol for sending elements downstream and you are good to go. A very trivial example is as follows:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App{

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorFlowMaterializer()

  val actorRef = system.actorOf(Props[ActorBasedSource])
  val pub = ActorPublisher[Int](actorRef)

  Source(pub).
    map(_ * 2).
    runWith(Sink.foreach(println))

  for(i <- 1 until 20){
    actorRef ! i.toString
    Thread.sleep(1000)
  }

}

class ActorBasedSource extends Actor with ActorPublisher[Int]{
  import ActorPublisherMessage._
  var items:List[Int] = List.empty

  def receive = {
    case s:String =>
      if (totalDemand == 0) 
        items = items :+ s.toInt
      else
        onNext(s.toInt)    

    case Request(demand) =>  
      if (demand > items.size){
        items foreach (onNext)
        items = List.empty
      }
      else{
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }


}
cmbaxter
  • 35,283
  • 4
  • 86
  • 95
  • I think that in the `Request`'s `else`, the line `items foreach (onNext)` should be `send foreach (onNext)` – mdm Mar 17 '15 at 22:47
  • 1
    There's a safer (because bounded) solution now using a `Source.actorPublisher` which will materialize to an `ActorRef` which is backed by an actor that does something very similar to what your custom `ActorBasedSource` does. E.g. your actor doesn't have a proper lifecycle and doesn't work with multiple materializations which is hard to get right. – jrudolph May 20 '15 at 09:41
  • @jrudolph, you are correct. A bit has changed since this answer. I will add an edit shortly showing the new way. Thanks. – cmbaxter May 20 '15 at 10:25
  • Thanks, very useful. ActorBasedSource seems very generic, isn't it surprising that it's not provided by default buy Akka-Streams? – Loic Jun 14 '15 at 19:47
  • Is there a simpler solution with Akka Streams 2, e.g. something like a channel to push events? – Loic Feb 05 '16 at 10:38
  • as a very nice and clear explanation on how to implement reactive streams with akka actors I suggest this: http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/ – Bill'o Feb 17 '16 at 12:01
13

With Akka Streams 2 you can use a sourceQueue : How to create a Source that can receive elements later via a method call?

Community
  • 1
  • 1
Loic
  • 3,310
  • 4
  • 25
  • 43
0

As I mention in this answer, the SourceQueue is the way to go, and since Akka 2.5 there is a handy method preMaterialize which eliminates the need to create a composite source first.

I give an example in my other answer.

PetrosP
  • 635
  • 6
  • 15