1

I'm building a very simple application in Kotlin with Vertx and RxJava 2 (RxKotlin), using Kovert REST framework and Retrofit. I have retrofit-vertx adapter and the RxJava2 Retrofit adapter. I can return an arbitrary list from my listUndergroundStations() method, but whenever I try to load from the remote API I get the following error:

Jun 23, 2017 2:16:29 PM uk.amb85.rxweb.api.UndergroundRestController
SEVERE: HTTP CODE 500 - /api/underground/stations - java.io.IOException: java.lang.IllegalStateException: message == null
java.lang.RuntimeException: java.io.IOException: java.lang.IllegalStateException: message == null
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
    at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:91)
    at io.reactivex.Single.blockingGet(Single.java:2148)
    at uk.amb85.rxweb.api.UndergroundRestController$listUndergroundStations$1.invoke(UndergroundRestController.kt:35)
    at uk.amb85.rxweb.api.UndergroundRestController$listUndergroundStations$1.invoke(UndergroundRestController.kt:13)
    at nl.komponents.kovenant.TaskPromise$wrapper$1.invoke(promises-jvm.kt:138)
    at nl.komponents.kovenant.TaskPromise$wrapper$1.invoke(promises-jvm.kt:130)
    at nl.komponents.kovenant.NonBlockingDispatcher$ThreadContext.run(dispatcher-jvm.kt:327)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.lang.IllegalStateException: message == null
    at com.julienviet.retrofit.vertx.VertxCallFactory$VertxCall.lambda$enqueue$0(VertxCallFactory.java:90)
    at io.vertx.core.impl.FutureImpl.tryFail(FutureImpl.java:170)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleException(HttpClientResponseImpl.java:270)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:259)
    at io.vertx.core.http.impl.ClientConnection.handleResponseEnd(ClientConnection.java:361)
    at io.vertx.core.http.impl.ClientHandler.doMessageReceived(ClientHandler.java:80)
    at io.vertx.core.http.impl.ClientHandler.doMessageReceived(ClientHandler.java:38)
    at io.vertx.core.http.impl.VertxHttpHandler.lambda$channelRead$0(VertxHttpHandler.java:71)
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:335)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:193)
    at io.vertx.core.http.impl.VertxHttpHandler.channelRead(VertxHttpHandler.java:71)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:122)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1228)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1039)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    ... 1 more
Caused by: java.lang.IllegalStateException: message == null
    at okhttp3.Response$Builder.build(Response.java:431)
    at com.julienviet.retrofit.vertx.VertxCallFactory$VertxCall.lambda$null$1(VertxCallFactory.java:109)
    at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:301)
    at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:257)
    ... 36 more

I can't for the life of me work out what is causing the IllegalStateException and have googled it to death. I don't think it's Rx related because I get the same error if I make the method return Observable<List<UndergroundLine>> or even get rid of Rx entirely and return Call<List<UndergroundLine>> (adjusting the controller accordingly). However, beyond that, I'm beating my head against a wall! Is anyone able to point out the error of my ways (besides putting a cushion under my head)?

Main Verticle:

class ApiVerticle : AbstractVerticle() {

    override fun start(startFuture: Future<Void>?) {
        // Initialise injection.
        configureKodein()

        val apiRouter = configureRouter(vertx)

        vertx.createHttpServer()
                .requestHandler { apiRouter.accept(it) }
                .listen(8080)
    }

    private fun configureKodein() {
        Kodein.global.addImport(Kodein.Module {
            import(TflUndergroundService.module)
        })
    }

    private fun configureRouter(vertx: Vertx): Router {
        val apiMountPoint = "api"
        val routerInit = fun Router.() {
            bindController(UndergroundRestController(), apiMountPoint)
        }
        val router = Router.router(vertx) initializedBy { router ->
            router.routerInit()
        }
        return router
    }
}

TflService:

interface TflService {
    @GET("/Line/Mode/tube")
    fun getAllUndergroundLines(): Observable<UndergroundLine>

    @GET("/Line/{lineName}/StopPoints")
    fun getStationsForUndergroundLine(
            @Path("lineName") lineName: String
    ): Observable<UndergroundStation>

    @GET("/Line/{lineName}/Arrivals?stopPointId={stationNaptanId")
    fun getArrivalsFor(
            @Path("lineName") lineName: String,
            @Path("stationNaptanId") stationNaptanId: String
    ) : Observable<Arrival>
}

data class UndergroundLine(val id: String, val name: String)
data class UndergroundStation(val naptanId: String, val commonName: String)
data class Arrival(
        val platformName: String,
        val towards: String,
        val currentLocation: String,
        val expectedArrival: LocalDateTime)

object TflUndergroundService {
    val module = Kodein.Module {
        val vertx: Vertx = Vertx.currentContext().owner()
        val client: HttpClient = vertx.createHttpClient()

        val jacksonMapper: ObjectMapper = ObjectMapper()
        jacksonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

        val retrofit: Retrofit = Retrofit.Builder()
                .baseUrl("https://api.tfl.gov.uk/")
                .callFactory(VertxCallFactory(client))
                .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync())
                .addConverterFactory(JacksonConverterFactory.create(jacksonMapper))
                .build()

        val tflService: TflService = retrofit.create(TflService::class.java)

        bind<TflService>() with instance(tflService)
    }
}

ApiKeySecured (Just requires "appid" to be a parameter):

class ApiKeySecured(private val routingContext: RoutingContext) : KodeinGlobalAware {
    val user: String = routingContext.request().getParam("appid") ?: throw HttpErrorUnauthorized()
}

The offending REST controller (in Kovert, Promise's are executed on Vertx worker thread):

class UndergroundRestController(val undergroundService: TflService = Kodein.global.instance()) {
    fun ApiKeySecured.listUndergroundStations(): Promise<List<UndergroundLine>, Exception> {
        //TODO: This is blocking, fix it!??
        return task {
            undergroundService
                    .getAllUndergroundLines()
                    .doOnError { println(it) }
                    .toList()
                    .blockingGet()
        }
    }
}

build.gradle:

mainClassName = "io.vertx.core.Launcher"
def mainVerticleName = "uk.amb85.rxweb.verticles.ApiVerticle"
def configurationFile = "conf/development.json"

run {
    args = ["run",
            mainVerticleName,
            "--launcher-class=$mainClassName",
            "-conf $configurationFile"
    ]
}
junglie85
  • 1,243
  • 10
  • 30

1 Answers1

1

There's an issue with retrofit-vertx you are using. OkHttp3's ResponseBuilder requires message to be not null, but VertxCallFactory doesn't set it.

It's fixed in the latest version, but as it's still in development, you have to use snapshot:

repositories {
    mavenCentral()
    maven {
        url "https://oss.sonatype.org/content/repositories/snapshots"
    }
}

dependencies {

    compile 'com.julienviet:retrofit-vertx:1.0.2-SNAPSHOT'

}

Switching to snapshot dependency fixes the issue you mention in your question, but there's an issue with json mapping, which can be easily fixed by switching code from:

@GET("/Line/Mode/tube")
fun getAllUndergroundLines(): Observable<UndergroundLine>

to:

@GET("/Line/Mode/tube")
fun getAllUndergroundLines(): Observable<List<UndergroundLine>>

And updating your data classes to have default empty constructor to let Jackson instantiate using reflection:

data class UndergroundLine(var id: String = "", var name: String = "")

More on emtpy default constructor for data classes.

But it's another question related to how to parse response from API you're using to Observable and should be asked if you don't find a workaround.

ledniov
  • 2,302
  • 3
  • 21
  • 27
  • Thanks. I'd noticed the need to return an `Observable>` when I tried not using retrofit-vertx. I'll give the snapshot a try and see where I go from there. – junglie85 Jun 26 '17 at 14:26
  • The version 1.0.2 that fixes the bug and updates to Vert.x 3.4.2 is now release – Julien Viet Jun 27 '17 at 06:14