11

I have a question about Springs RSocketRequester. I have a rsocket server and client. Client connects to this server and requests @MessageMapping endpoint. It works as expected.

But what if I restart the server. How to do automatic reconnect to rsocket server from client? Thanks

Server:

@Controller
class RSC {

    @MessageMapping("pong")
    public Mono<String> pong(String m) {
        return Mono.just("PONG " + m);
    }
}

Client:

@Bean
    public RSocketRequester rSocketRequester() {
        return RSocketRequester
                .builder()
                .connectTcp("localhost", 7000)
                .block();

    }

@RestController
class RST {

    @Autowired
    private RSocketRequester requester;

    @GetMapping(path = "/ping")
    public Mono<String> ping(){
        return this.requester
                .route("pong")
                .data("TEST")
                .retrieveMono(String.class)
                .doOnNext(System.out::println);
    }
}
George
  • 121
  • 2
  • 6

3 Answers3

13

Updated for Spring Framework 5.2.6+

You could achieve it with io.rsocket.core.RSocketConnector#reconnect.

@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rSocketRequesterBuilder) {
    return rSocketRequesterBuilder
            .rsocketConnector(connector -> connector
                    .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", 7000);
}
@RestController
public class RST {
    @Autowired
    private Mono<RSocketRequester> rSocketRequesterMono;

    @GetMapping(path = "/ping")
    public Mono<String> ping() {
        return rSocketRequesterMono.flatMap(rSocketRequester ->
                rSocketRequester.route("pong")
                        .data("TEST")
                        .retrieveMono(String.class)
                        .doOnNext(System.out::println));
    }
}
Alexander Pankin
  • 3,787
  • 1
  • 13
  • 23
  • 1
    Thanks for the answer I did combine both approches and I have got working solution. Appreciate your help. – George Nov 16 '19 at 22:37
  • @George, would you be able to share your solution here? – RamPrakash Jun 28 '20 at 18:42
  • As of Spring 5.2.6 The RSocketRequester.Builder.rsocketFactory method has been deprecated, so the solution above no longer works. The replacement is to use RSocketRequester.Builder.rsocketConnector. Does anyone have some sample code that uses rsockeConnector? – David V Sep 09 '20 at 05:15
  • @DavidV , I updated answer for new API. Now it's not that hard – Alexander Pankin Sep 09 '20 at 13:26
6

I don't think I would create a RSocketRequester bean in an application. Unlike WebClient (which has a pool of reusable connections), the RSocket requester wraps a single RSocket, i.e. a single network connection.

I think it's best to store a Mono<RSocketRequester> and subscribe to that to get an actual requester when needed. Because you don't want to create a new connection for each call, you can cache the result. Thanks to Mono retryXYZ operators, there are many ways you can refine the reconnection behavior.

You could try something like the following:

@Service
public class RSocketPingService {

    private final Mono<RSocketRequester> requesterMono;

    // Spring Boot is creating an auto-configured RSocketRequester.Builder bean
    public RSocketPingService(RSocketRequester.Builder builder) {
        this.requesterMono = builder
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .connectTcp("localhost", 7000).retry(5).cache();
    }

    public Mono<String> ping() {
        return this.requesterMono.flatMap(requester -> requester.route("pong")
                .data("TEST")
                .retrieveMono(String.class));
    }


}
Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
  • When I use your code the result is same. I get exception **java.nio.channels.ClosedChannelException: null** That is what I want to resolve. When this connection to rsocket server is broken, how to heal automatically, when server is up again. I am investigating more RSocketLoadBalancedMono if this could solve my issue or potentially springs retry. What do you think? – George Nov 15 '19 at 13:18
  • Thanks for the answer. I did little change to the code and that is why it did not work on first run. So when server is back online it resubscribes to this Mono and reconnects which is what I needed. Thanks a lot. – George Nov 16 '19 at 22:35
  • 1
    @George can you please share a working answer? thanks! – simbo1905 Dec 01 '20 at 19:36
  • 1
    @George +1 could you please share your full working solution? I need it as well. Thanks man! – javaistaucheineinsel Feb 15 '21 at 16:51
1

the answer here https://stackoverflow.com/a/58890649/2852528 is the right one. The only thing I would like to add is that reactor.util.retry.Retry has many options for configuring the logic of your retry including even logging.

So I would slightly improve the original answer, so we'd be increasing the time between the retry till riching the max value (16 sec) and before each retry log the failure - so we could monitor the activity of the connector:

@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder) {

    return builder.rsocketConnector(connector -> connector.reconnect(Retry.backoff(Integer.MAX_VALUE, Duration.ofSeconds(1L))
                                                                          .maxBackoff(Duration.ofSeconds(16L))
                                                                          .jitter(1.0D)
                                                                          .doBeforeRetry((signal) -> log.error("connection error", signal.failure()))))
                  .connectTcp("localhost", 7000);

}
 
Serhii Povísenko
  • 3,352
  • 1
  • 30
  • 48