4

I'm aware that as of Akka 2.4.16, there is no "remote" implementation of Reactive Streams. The specification focuses on a stream running on a single JVM.

However, considering the use case to involve another JVM for some processing while maintaining back pressure. The idea is to have a main application that provides a user interface running a stream. For instance, this stream has a stage performing some heavy computations that should run on a different machine. I'm interested in ways to run streams in a distributed way - I came across some articles pointing out some ideas:

What other alternatives are there? Are there any significant downsides to the above? Any special characteristics to consider?

Update: This question is not limited to a single use case. I'm generally interested in all possible ways to work with streams in a distributed environment. That means, e.g. it can involve only one stream that integrates actors with .mapAsync or e.g. there could be two separate streams on two machines communicating via Akka HTTP. The only requirement is that back pressure has to be enforced among all components.

Community
  • 1
  • 1
Toaditoad
  • 254
  • 2
  • 12
  • 1
    I think that you are mis understanding something. So... how can you have a inter-jvm stream ? Well... by having components which actually reside in different jvm's. Now you need to understand that the those components in this particular case will be Actors. So... you just need to create a FlowShape/Sink/Source with some `remote Actor` and Artery will take care of the messaging. – sarveshseri Jan 18 '17 at 13:03
  • I totally agree with your comment - according to the blog post, Artery maintains the back pressure when these two actors communicate with each other. My question rather aims to understand whether e.g. using `.mapAsync` for integrating remote actors in a stream has the same result: having a stream that processes something on a different machine. More generally asked: What are the ways to implement streams crossing JVM boundaries? – Toaditoad Jan 18 '17 at 13:13

2 Answers2

1

Well... It seems that I will have to add an example for that. One thing that you need to understand is that BackPressure is handled by the AsyncBoundries in GraphStages. It really has nothing to do with a component existing some where else. Also... It is not dependent on Artery which is nothing but the new remote transport.

Here is an example of probably the simplest cross-jvm stream,

First Application,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

class MyActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case msg @ _ => {
      log.info(msg.toString)
      sender() ! msg
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

And Second application... actually "runs" the stream which uses the actor in first application.

import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps
import scala.concurrent.duration._

object YourApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=19000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("your-actor-system", config)

  import actorSystem.dispatcher

  val logger = actorSystem.log

  implicit val implicitActorSystem = actorSystem
  implicit val actorMaterializer = ActorMaterializer()

  val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@127.0.0.1:18000/user/my-actor")

  val myActorSelection = actorSystem.actorSelection(myActorPath)

  val source = Source(1 to 10)

  // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
  val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
      (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
    )
  })

  val sink = Sink.foreach[Int](i => logger.info(i.toString))

  val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)

  val streamRun = stream.run()

}
sarveshseri
  • 13,738
  • 28
  • 47
  • Thank you for this example. As said in my question, I'm familiar with the approach using a stage with `.mapAsync`. But I'm wondering about other alternatives to that. For instance, it would be possible to convert your example to something having streams connected via TCP by Akka HTTP. I'm updating my question to make that clearer... – Toaditoad Jan 18 '17 at 14:48
  • @Toaditoad Please do not take this question otherwise - Are you new to Akka and Scala ? I frankly did not understand the meaning of `having streams connected via TCP by Akka HTTP` because that does not make any sense in my combined understanding of TCP + Akka + Http + Akka Streams + Akka HTTP. – sarveshseri Jan 19 '17 at 06:43
  • Yes, I'm fairly new to that world. I'm investigating possible designs for processing video frames with actors and streams. Therefore, I'm not looking for just one single working solution but want to try some alternatives. I haven't started with the TCP/Akka HTTP idea yet but have you seen Konrad Malawski's answer (http://stackoverflow.com/a/30693174/4169741)? He is one of the Akka gurus with Lightbend and his answer sounds pretty clear to me. – Toaditoad Jan 19 '17 at 06:52
  • @Toaditoad And that is where you are taking things out of context. That question was whether you can subscribe to a stream from a different JVM and by `subscribe` he mean by explicitly using that standard `publisher.subscribe` method (which is part of a standard reactive-publisher). And his answer was regarding that explicit case only. Your question is more about back-pressure which is the responsibility of each individual component in the stream. And as I have shown you can combine cross-jvm components in the same stream. – sarveshseri Jan 19 '17 at 07:15
  • Yes, I get that. And again, I totally agree with your answer that you can have one stream involving another JVM while maintaining back pressure thanks to the asynchronous integration of actors with `.mapAsync`. I already pointed that out in my question (third bullet point) - but I was looking for other approaches, even if they are for a different use case like connecting two different streams on two separate JVMs. – Toaditoad Jan 19 '17 at 07:30
0

In Akka 2.5.10 and above, you can now use StreamRefs for this. StreamRefs are designed for this use-case, so they are particularly suitable for remote work queues, because they back pressure until the stream that is locally attached to them can accept more work.

Robin Green
  • 32,079
  • 16
  • 104
  • 187
  • Nice to hear that there is still active development around Akka. Maybe your hint is gonna help someone. – Toaditoad Jul 08 '21 at 05:21