1

I'm trying to use WebFlux with RSocket, The sample application has server and client applications. both running on WebFlux and RSocket, my rsocket communication type is request-stream. client-server application runs perfectly fine for couple concurrent requests, however when I load test with 1000qps with 8 threads, requests starts hanging. On investigation below sample code passes through load test.


WORKING SAMPLE

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
            RSocketClientProperties clientProp) {

        RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();

        rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
        return rsocketRequester;
    }


}

Client.java

@Service
public class PersonRSocketClient {

    @Autowired
    private RSocketRequester personClient;

    public Flux<Person> list() {
        return personClient.route("person").retrieveFlux(Person.class);
    }

}

NOT WORKING

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
        RSocketClientProperties clientProp) {

        
        Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort());

        return rsocketRequester;
    }
}

Client.java

@Service
public class PersonRSocketClient {

    @Autowired
    private Mono<RSocketRequester> personClient;

    public Flux<Person> list() {
        return personClient
                .flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
    }

}

How to map request-stream to flux correctly?

Karthik Prasad
  • 9,662
  • 10
  • 64
  • 112

0 Answers0