2

While learning how to use akka I/O I am trying to implement a simple protocal on top of akka i/o and was following the documentation here.

However in my gradle file I use version 2.3.9 as shown below

dependencies {
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.7'
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
    compile group: 'com.typesafe.akka', name: 'akka-contrib_2.11', version: '2.3.9'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

import of some things that are pipeline specific like

import akka.io.SymmetricPipelineStage;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;

generate can not resolve symbol errors.

Hence my questions.

  1. Were these removed or there is some dependancy I need to add to my gradle file.
  2. If they were removed, how would the encod/decode stage be dealt with?
Bwire
  • 1,181
  • 1
  • 14
  • 25

2 Answers2

6

Pipelines were experimental and indeed removed in Akka 2.3. The removal was documented in the Migration Guide 2.2.x to 2.3.x.

There is also mention of being able to package the "older" pipeline implementation with Akka 2.3 here, though it doesn't appear to be a simple addition of a dependency.

I would wager that Akka Streams is intended to be the better replacement of pipelines, coming in Akka 2.4, but available now as an experimental module. The encode/decode stage or protocol layer can be handled by using Akka Streams in conjunction with Akka I/O.

Eric Zoerner
  • 761
  • 6
  • 9
  • thanks for the answer. if you don't mind could you share an example codec with the protocol layer prefferably in java built on top of akka stream? – Bwire Mar 28 '15 at 18:22
  • 1
    I have some Scala code that takes raw bytes and converts them into message frames using Akka Streams in my [scalable-chat project](https://github.com/ezoerner/scalable-chat/blob/introduce-akka-streams/messaging/src/main/scala/scalable/messaging/tcp/FrameStage.scala). I don't have any sample Java code, unfortunately. The relevant section in the akka docs is at [Custom stream processing](http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/java/stream-customize.html). – Eric Zoerner Mar 28 '15 at 18:46
  • 2
    In particular a complete documentation example of layering protocols is shown [in the Akka Streams documentation](http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/java/stream-graphs.html#Bidirectional_Flows) (it will be highly beneficial to read all the docs and not just that section). – Roland Kuhn Mar 29 '15 at 13:42
  • @RolandKuhn thanks for the pointer though I think I may be forced to stick with akka io since I would like to implement something that provides push mechanism like an xmpp server. Is this possible with Akka streams too? In akka io I have been able to use an actor to send a message from the server to the client once connected however I can't seem to figure out how this can be done in akka streams – Bwire Mar 29 '15 at 16:37
  • 2
    @Bwire Have a look at `ActorPublisher` / `ActorSubscriber`, they are meant to be the bridge between Actors and Reactive Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html – Konrad 'ktoso' Malawski Mar 29 '15 at 17:31
0

Yes, pipelines were removed without any alternatives. I came from Netty world and don't find pipelines "unintuitive" - they accumulate buffers and supply children actors with ready to use messages.

Take a look at our solutions, it requires "org.scalaz" %% "scalaz-core" % 7.2.14 as a dependency.

Codec class is a State monad which is being called by the actor and produces output. In our projects we are using Varint32 protobuf encoding, so every message is prepended with varint32 length field:

import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage

import scalaz.{-\/, State, \/, \/-}

trait Accumulator
trait Codec[IN, OUT] {

  type Stream = State[Accumulator, Seq[IN]]

  def decode(buffer: Array[Byte]): Throwable \/ IN

  def encode(message: OUT): Array[Byte]

  def emptyAcc: Accumulator

  def decodeStream(data: Array[Byte]): Stream

}

object Varint32ProtoCodec {

  type ProtoMessage[T] = GeneratedMessage with Message[T]

  def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)

}

class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {

  import com.google.protobuf.CodedOutputStream

  private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator

  override def emptyAcc: Accumulator = AccumulatorImpl()

  override def decode(buffer: Array[Byte]): Throwable \/ IN = {
    \/.fromTryCatchNonFatal {
      val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
      val bufferLength = buffer.length
      val dataBuffer = buffer.drop(bufferLength - dataLength)
      protoType.parseFrom(dataBuffer)
    }
  }

  override def encode(message: OUT): Array[Byte] = {
    val messageBuf = message.toByteArray
    val messageBufLength = messageBuf.length
    val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
    val prependLengthBuffer = new Array[Byte](prependLength)
    CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
    prependLengthBuffer ++ messageBuf
  }

  override def decodeStream(data: Array[Byte]): Stream = State {
    case acc: AccumulatorImpl =>
      if (data.isEmpty) {
        (acc, Seq.empty)
      } else {
        val accBuffer = acc.buffer ++ data
        val accExpected = readExpectedLength(accBuffer, acc)
        if (accBuffer.length >= accExpected) {
          val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
          val output = decode(frameBuffer) match {
            case \/-(proto) => Seq(proto)
            case -\/(_) => Seq.empty
          }
          val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
          (newAcc, output ++ recOutput)
        } else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
      }
    case _ => (emptyAcc, Seq.empty)
  }

  private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
    if (acc.expected == -1 && data.length >= 1) {
      \/.fromTryCatchNonFatal {
        val is = CodedInputStream.newInstance(data)
        val dataLength = is.readRawVarint32()
        val tagLength = is.getTotalBytesRead
        dataLength + tagLength
      }.getOrElse(acc.expected)
    } else acc.expected
  }

}

And the Actor is:

import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}

object FrameCodec {
  def props() = Props[FrameCodec]
}

class FrameCodec extends Actor {

  import akka.io.Tcp._

  private val logger       = Logging(context.system, this)
  private val codec        = Varint32ProtoCodec[Upstream, Downstream](Upstream)
  private val sessionActor = context.actorOf(Session.props())

  def receive = {
    case r: Received =>
      context become stream(sender(), codec.emptyAcc)
      self ! r
    case PeerClosed => peerClosed()
  }

  private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
    case Received(data) =>
      val (next, output) = codec.decodeStream(data.toArray).run(acc)
      output.foreach { up =>
        sessionActor ! up
      }
      context become stream(ioActor, next)
    case d: Downstream =>
      val buffer = codec.encode(d)
      ioActor ! Write(ByteString(buffer))
    case PeerClosed => peerClosed()
  }

  private def peerClosed() = {
    logger.info("Connection closed")
    context stop self
  }

}
Eugene Zhulkov
  • 505
  • 3
  • 13