Callback --> Source
Elaborating on Endre Varga's answer, below is the code that will create the DataConsumer
callback function which will send messages into an akka stream Source
.
Caution: There is a lot more to creating a functional ActorPublish than I am indicating below. In particular, buffering needs to be done to handle the case where the DataProducer
is calling onData
faster than the Sink
is signalling demand (see this example). The below code just sets up the "wiring".
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
Callback --> Whole Stream
The original question ask specifically for a callback to Source, but dealing with callbacks is easier to handle if the entire stream is already available (not just the Source). That is because the stream can be materialized into an ActorRef
using the Source#actorRef function. As an example:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)