2

I have the below program that connects to a Spring boot rsocket server running on localhost:7999. I have configured the connector Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5)) As you can see the the RSocketRequester is Mono so it should hold a single connection. When the connection fails and the Retry begins, I see that every retry is made from a different thread i.e. as below parallel-1---parallel-8. May I know the reason behind this ?

12:08:24.463550|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:30.470593|parallel-2|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:36.475666|parallel-3|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:42.494801|parallel-4|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #4 (4 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:48.499084|parallel-5|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #5 (5 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:54.503385|parallel-6|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #6 (6 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:00.509830|parallel-7|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #7 (7 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:06.545815|parallel-8|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #8 (8 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:12.553582|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #9 (9 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}

My Program is as below:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(e -> e.add(new Jackson2CborEncoder()))
    .decoders(e -> e.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requester = Mono.just(RSocketRequester.builder()
     .rsocketConnector(connector -> {
           connector.reconnect(
                     Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
                     .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
             .acceptor(RSocketMessageHandler.responder(strategies,this))
             .payloadDecoder(PayloadDecoder.ZERO_COPY);
            })
      .dataMimeType(MediaType.APPLICATION_CBOR)
      .setupRoute("test")
      .setupData("test-123")
      .rsocketStrategies(strategies)
      .tcp("localhost",7999));

Saji
  • 111
  • 6

2 Answers2

0

This article (Flight of the Flux 3) is a good introduction to Spring Reactor threading model. Reactor is the base library providing an implementation of Rx functionality in rsocket-java.

The key sentence is

Schedulers.parallel() is good for CPU-intensive but short-lived tasks. It can execute N such tasks in parallel (by default N == number of CPUs)

Also read up on https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html

If all operations were guaranteed to be on a single thread, then it's likely it would cause noisy latency, as two different clients who happened to get the same thread initially would compete for the thread throughout the lifetime of your program. So it's better than general work get's spread evenly between a limited pool of threads.

Yuri Schimke
  • 12,435
  • 3
  • 35
  • 69
  • Thanks @Yuri for your quick response. I understood in general how reactor threading model works. Here my question is when a single rsocket instance needs to restored, why the retry logic needs to be assigned to different thread that too for every attempt. Attempts are not parallel, it is sequential. Cannot it be a single thread take the control of retry ? Hope my question doesn't confuse you. – Saji Dec 26 '20 at 15:25
  • Not sure. I know that Reactor will fuse sequential operations onto a single consistent thread. But it isn't expected that any retries with network IO between them would be on the same thread. It's working as designed. – Yuri Schimke Dec 26 '20 at 15:48
  • 2
    @Saji your retry policy specifies a delay, which means a delayed event has to be generated which would then trigger a reconnect attempt. There aren't many ways to do that: block (Thread.sleep()), spin loop and a scheduled task. The first two approaches execute on the same thread, the third one is an asynchronous operation. Obviously, Reactor, being a non-blocking framework, uses the last option. And all delay tasks use parallel scheduler (essentially a ScheduledThreadPoolExecutor) by default. – bruto Dec 27 '20 at 09:01
  • @bruto. Scheduled task should be running on a seperate thread and it keep awake and run the task on the same thread, at least that's what I observed from Java Timer Task implementation. If you look at my log, every retry attempt is handled by a different thread. This is something fishy. – Saji Dec 27 '20 at 10:05
  • 1
    @Saji you should stop focusing on which precise thread code is executed on in a Rx framework. The model is around Schedulers, and context in Reactor has very different semantics than normal java threads and thread locals etc. – Yuri Schimke Dec 27 '20 at 10:17
  • Writing your own RetryBackOffSpec that uses Schedulers.single() instead of parallel() (see fixedDelay source code) would grant your wish to see all retries happen on the same thread. However, I struggle to think of a good reason to want to do so :) – bruto Dec 27 '20 at 11:41
  • +1 to that. It's doing the right thing. It's unclear why it's so important for you to be the same thread. – Yuri Schimke Dec 27 '20 at 14:15
-1

Thanks @Yuri @bruto @OlegDokuka and for your suggestions and answers. I have changed my program as below to enforce retry to run on single thread.

connector.reconnect(
        Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
        .scheduler(Schedulers.single()) // <---- This enforces retry to run on a single thread
        .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
        .acceptor(RSocketMessageHandler.responder(strategies,this))
        .payloadDecoder(PayloadDecoder.ZERO_COPY);
      })
Saji
  • 111
  • 6