I am working with Vert.x 2.x (http://vertx.io) which makes extensive use of asynchronous callbacks. These quickly become unwieldy with typical nesting/callback hell issues.
I have considered both Scala Futures/Promises (which I think would be the defacto approach) and also Reactive Extensions (RxScala).
From my testing I have found some interesting performance results.
My testing is pretty basic, I'm just issuing a bunch of HTTP requests (via weighttp) to a Vert.x verticle that makes an asynchronous call across the Vert.x eventbus, and processes a response that is then returned in an HTTP 200 response.
What I have found is the following (performance here is measured in terms of HTTP requests per second):
- Async Callback performance = 68,305 rps
- Rx performance = 64,656 rps
- Future/Promises performance = 61,376 rps
The test conditions were:
- Mac Pro OS X Yosemite 10.10.2
- Oracle JVM 1.8U25
- weighttp version 0.3
- Vert.x 2.1.5
- Scala 2.10.4
- RxScala 0.23.0
- 4 x Web Service Verticle Instances
- 4 x Backend Service Verticle Instances
The test command was
weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"
The figures above are the average of five test runs less best and worst result. Note the results are very consistent around the presented average (no more than a few hundred rps deviation).
Is there any known reason why the above might be happening - i.e. Rx > Futures in pure requests per second?
Reactive Extensions in my opinion are superior as they can do so much more but given the standard approach to async callbacks typically seems to go down the Futures/Promises track I'm surprised at the performance hit.
EDIT: Here is the Web Service Verticle
class WebVerticle extends Verticle {
override def start() {
val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
val approach = container.env().getOrElse("APPROACH", "ASYNC")
container.logger.info("Listening on port: " + port)
container.logger.info("Using approach: " + approach)
vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
approach match {
case "ASYNC" => sendAsync(req, "hello")
case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
}
}.listen(port)
}
// Async callback
def sendAsync(req: HttpServerRequest, body: String): Unit = {
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
req.response.end(msg.body())
})
}
// Rx
def sendWithObservable(body: String) : Observable[String] = {
val subject = ReplaySubject[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
subject.onNext(msg.body())
subject.onCompleted()
})
subject
}
// Futures
def sendWithFuture(body: String) : Future[String] = {
val promise = Promise[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
promise.success(msg.body())
})
promise.future
}
}
EDIT: Here is the Backend Verticle
class ServiceVerticle extends Verticle {
override def start(): Unit = {
vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
msg.reply("Hello Scala")
})
}
}