I'm building a very simple application in Kotlin with Vertx and RxJava 2 (RxKotlin), using Kovert REST framework and Retrofit. I have retrofit-vertx adapter and the RxJava2 Retrofit adapter. I can return an arbitrary list from my listUndergroundStations()
method, but whenever I try to load from the remote API I get the following error:
Jun 23, 2017 2:16:29 PM uk.amb85.rxweb.api.UndergroundRestController
SEVERE: HTTP CODE 500 - /api/underground/stations - java.io.IOException: java.lang.IllegalStateException: message == null
java.lang.RuntimeException: java.io.IOException: java.lang.IllegalStateException: message == null
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:91)
at io.reactivex.Single.blockingGet(Single.java:2148)
at uk.amb85.rxweb.api.UndergroundRestController$listUndergroundStations$1.invoke(UndergroundRestController.kt:35)
at uk.amb85.rxweb.api.UndergroundRestController$listUndergroundStations$1.invoke(UndergroundRestController.kt:13)
at nl.komponents.kovenant.TaskPromise$wrapper$1.invoke(promises-jvm.kt:138)
at nl.komponents.kovenant.TaskPromise$wrapper$1.invoke(promises-jvm.kt:130)
at nl.komponents.kovenant.NonBlockingDispatcher$ThreadContext.run(dispatcher-jvm.kt:327)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.lang.IllegalStateException: message == null
at com.julienviet.retrofit.vertx.VertxCallFactory$VertxCall.lambda$enqueue$0(VertxCallFactory.java:90)
at io.vertx.core.impl.FutureImpl.tryFail(FutureImpl.java:170)
at io.vertx.core.http.impl.HttpClientResponseImpl.handleException(HttpClientResponseImpl.java:270)
at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:259)
at io.vertx.core.http.impl.ClientConnection.handleResponseEnd(ClientConnection.java:361)
at io.vertx.core.http.impl.ClientHandler.doMessageReceived(ClientHandler.java:80)
at io.vertx.core.http.impl.ClientHandler.doMessageReceived(ClientHandler.java:38)
at io.vertx.core.http.impl.VertxHttpHandler.lambda$channelRead$0(VertxHttpHandler.java:71)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:335)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:193)
at io.vertx.core.http.impl.VertxHttpHandler.channelRead(VertxHttpHandler.java:71)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:122)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1228)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1039)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
... 1 more
Caused by: java.lang.IllegalStateException: message == null
at okhttp3.Response$Builder.build(Response.java:431)
at com.julienviet.retrofit.vertx.VertxCallFactory$VertxCall.lambda$null$1(VertxCallFactory.java:109)
at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:301)
at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:257)
... 36 more
I can't for the life of me work out what is causing the IllegalStateException
and have googled it to death. I don't think it's Rx related because I get the same error if I make the method return Observable<List<UndergroundLine>>
or even get rid of Rx entirely and return Call<List<UndergroundLine>>
(adjusting the controller accordingly). However, beyond that, I'm beating my head against a wall! Is anyone able to point out the error of my ways (besides putting a cushion under my head)?
Main Verticle:
class ApiVerticle : AbstractVerticle() {
override fun start(startFuture: Future<Void>?) {
// Initialise injection.
configureKodein()
val apiRouter = configureRouter(vertx)
vertx.createHttpServer()
.requestHandler { apiRouter.accept(it) }
.listen(8080)
}
private fun configureKodein() {
Kodein.global.addImport(Kodein.Module {
import(TflUndergroundService.module)
})
}
private fun configureRouter(vertx: Vertx): Router {
val apiMountPoint = "api"
val routerInit = fun Router.() {
bindController(UndergroundRestController(), apiMountPoint)
}
val router = Router.router(vertx) initializedBy { router ->
router.routerInit()
}
return router
}
}
TflService:
interface TflService {
@GET("/Line/Mode/tube")
fun getAllUndergroundLines(): Observable<UndergroundLine>
@GET("/Line/{lineName}/StopPoints")
fun getStationsForUndergroundLine(
@Path("lineName") lineName: String
): Observable<UndergroundStation>
@GET("/Line/{lineName}/Arrivals?stopPointId={stationNaptanId")
fun getArrivalsFor(
@Path("lineName") lineName: String,
@Path("stationNaptanId") stationNaptanId: String
) : Observable<Arrival>
}
data class UndergroundLine(val id: String, val name: String)
data class UndergroundStation(val naptanId: String, val commonName: String)
data class Arrival(
val platformName: String,
val towards: String,
val currentLocation: String,
val expectedArrival: LocalDateTime)
object TflUndergroundService {
val module = Kodein.Module {
val vertx: Vertx = Vertx.currentContext().owner()
val client: HttpClient = vertx.createHttpClient()
val jacksonMapper: ObjectMapper = ObjectMapper()
jacksonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val retrofit: Retrofit = Retrofit.Builder()
.baseUrl("https://api.tfl.gov.uk/")
.callFactory(VertxCallFactory(client))
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync())
.addConverterFactory(JacksonConverterFactory.create(jacksonMapper))
.build()
val tflService: TflService = retrofit.create(TflService::class.java)
bind<TflService>() with instance(tflService)
}
}
ApiKeySecured (Just requires "appid" to be a parameter):
class ApiKeySecured(private val routingContext: RoutingContext) : KodeinGlobalAware {
val user: String = routingContext.request().getParam("appid") ?: throw HttpErrorUnauthorized()
}
The offending REST controller (in Kovert, Promise
's are executed on Vertx worker thread):
class UndergroundRestController(val undergroundService: TflService = Kodein.global.instance()) {
fun ApiKeySecured.listUndergroundStations(): Promise<List<UndergroundLine>, Exception> {
//TODO: This is blocking, fix it!??
return task {
undergroundService
.getAllUndergroundLines()
.doOnError { println(it) }
.toList()
.blockingGet()
}
}
}
build.gradle:
mainClassName = "io.vertx.core.Launcher"
def mainVerticleName = "uk.amb85.rxweb.verticles.ApiVerticle"
def configurationFile = "conf/development.json"
run {
args = ["run",
mainVerticleName,
"--launcher-class=$mainClassName",
"-conf $configurationFile"
]
}