4

I'm working on a performance issue that occurred when switching current project to Akka stream.

After simplify the problems, it seems that Akka stream was passing much lesser messages then I was expected.

Here I have two very simple pieces of code, both just writing 10 bytes a time to a file on disk.

The first one uses two threads and a ArrayBlockingQueue connecting them:

val bw = Files.newBufferedWriter(Paths.get("test.txt"))
val target = "0123456789".toCharArray
val abq = new ArrayBlockingQueue[Array[Char]](10000)

new Thread(new Runnable {
  override def run(): Unit = {
    while (true) {
      bw.write(abq.take())
    }
  }
}).start()

while (true) {
  abq.put(target)
}

The second one uses Akka stream:

implicit val system: ActorSystem = ActorSystem("TestActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Source & Sink runs in two actors
// Both output of Source & input of Sink were buffered
Source
  .repeat(ByteString("0123456789"))
  .buffer(8192, OverflowStrategy.backpressure)
  .async
  .runWith(
    FileIO
      .toPath(Paths.get("test.txt"))
      .withAttributes(Attributes.inputBuffer(8192, 8192))
  )

And I have found out that the first one writes file at a speed of 27.4MB/s while the second one only writes file at a speed of 3.4MB/s on my testing machine. The thread-with-arrayBlockingQueue one was 8 times faster then the Akka one.

I have tried to change the Sink from FileIO to a hand written Sink that writes to a BufferedWriter. This let the second one's speed increases to 5.5MB/s but still 5x slower than the first one.

In my understanding, the Akka stream would have a much better performance comparing it reaches now.

Is there something that I've done was wrong in this scenario?

lxohi
  • 350
  • 2
  • 11
  • I've forgot to mention that if I configured the Akka stream without async & buffers which makes the Source & Sink connected directly in one actor, the write speed will be 2.2MB/s in my testing environment. – lxohi May 15 '18 at 14:00
  • 1
    Already covered in similar threads: https://stackoverflow.com/questions/33416891/akka-stream-implementation-slower-than-single-threaded-implementation/33437024#33437024 – Ramón J Romero y Vigil May 15 '18 at 19:09
  • @RamonJRomeroyVigil Thank you for you comment! I have read that thread's answers before I was posting this question. There are two points in that thread's answers. One is the speed may limited by back-pressure. The other is Akka stream may have a cost about 1µs per element. – lxohi May 16 '18 at 03:54
  • For the first point. I've already tried add buffer & even add multiple the same Sources. I would like to keep the buffer always full so that Sink will not waiting for result of pull operations. But it seems only one Source is enough here and adding more Sources will not increase the write speed. – lxohi May 16 '18 at 03:55
  • For the second point. When I was reading it again this time, I fount out what I was thought about it seems wrong. This maybe the answer. I'm trying to verify it now. – lxohi May 16 '18 at 04:03

1 Answers1

0

I was figured out what was really makes it slow in this case.

I have swapped the FileIO sink from the question to a hand written one with some time counter in order to measure every step's cost in the sink.

The new sink is here:

final class FileWriteSink extends GraphStage[SinkShape[Array[Char]]] {

  private val in: Inlet[Array[Char]] = Inlet("ArrayOfCharInlet")

  override def shape: SinkShape[Array[Char]] = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      // note that the operations to these vars below are not thread-safe
      // but it is fairly enough to show the time differences in a large scale with a relatively low cost
      private var count = 0L

      private var grabTime = 0L
      private var writeTime = 0L
      private var pullTime = 0L
      private var gapTime = 0L
      private var counterTime = 0L

      private var lastTime = 0L
      private var currTime = System.nanoTime()

      @inline private def timeDiff(): Long = {
        lastTime = currTime
        currTime = System.nanoTime()
        currTime - lastTime
      }

      private val bw = Files.newBufferedWriter(Paths.get("test.xml"))
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          gapTime += timeDiff()
          count += 1
          if (count % 1000000 == 0) {
            println(s"count: $count, gapTime: $gapTime, counterTime: $counterTime, grabTime: $grabTime, writeTime: $writeTime, pullTime: $pullTime")
            println(s"count: $count, gapTime-avg: ${gapTime / count}, counterTime-avg: ${counterTime / count}, grabTime-avg: ${grabTime / count}, writeTime-avg: ${writeTime / count}, pullTime-avg: ${pullTime / count}")
          }
          counterTime += timeDiff()
          val v = grab(in)
          grabTime += timeDiff()
          bw.write(v)
          writeTime += timeDiff()
          pull(in)
          pullTime += timeDiff()
        }
      })

      override def preStart(): Unit = {
        pull(in)
      }
    }
  }

}

And then I have got this log from my testing environment:

count: 1000000, gapTime: 3220562882, counterTime: 273008576, grabTime: 264956553, writeTime: 355040917, pullTime: 260033342
count: 1000000, gapTime-avg: 3220, counterTime-avg: 273, grabTime-avg: 264, writeTime-avg: 355, pullTime-avg: 260
count: 2000000, gapTime: 6307318517, counterTime: 549671865, grabTime: 532654603, writeTime: 708526613, pullTime: 524305026
count: 2000000, gapTime-avg: 3153, counterTime-avg: 274, grabTime-avg: 266, writeTime-avg: 354, pullTime-avg: 262
count: 3000000, gapTime: 9403004835, counterTime: 821901662, grabTime: 797670212, writeTime: 1054416804, pullTime: 786163401
count: 3000000, gapTime-avg: 3134, counterTime-avg: 273, grabTime-avg: 265, writeTime-avg: 351, pullTime-avg: 262

It turns out that the time gap between the pull() and next onPush() call is the very slow thing here.

Even if the buffer is full so the Sink should not need to wait for next element to be generated by the source. There is still a nearly 3µs time gap between two onPush() calls in my testing environment.

So what I should expected here is Akka stream will have a great overall throughput. While the gap time between two onPush() calls needs to be carefully aware & treated when designing the structure of actual stream.

lxohi
  • 350
  • 2
  • 11