1

Staring with the tutorial code at benwilcock/spring-rsocket-demo I am trying to write a server that replicates messages to a second server before responding to a client.

To try to debug my issues I am only attempting a trivial ping-pong exchange between servers. Only when the second server responds to the pong message should the first server reply to the client:

@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
    // register a mono that will be completed when replication to another server has happened
    String uuid = UUID.randomUUID().toString();
    Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));

    // FIXME attempt to send a nested request-response message that will complete the outer message later
    this.requesterMono.flatMap(requester -> requester.route("pong")
            .data(uuid)
            .retrieveMono(String.class))
            .subscribeOn(Schedulers.elastic())
            .subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));

    // return the deferred work that will be completed by the pong response
    return deferred;
}

That logic is trying to use this answer to create a connection to the second server that will reconnect:

    this.requesterMono = builder.rsocketConnector(connector -> connector
            .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", otherPort).cache();

To complete the picture here is the trivial ping-pong logic:

@MessageMapping("pong")
public Mono<String> pong(String m) {
    return Mono.just(m);
}

and here is the logic that holds the state of the outer client response that is completed when the other server responds:

public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();

public void registerRequest(String v, MonoSink<T> sink) {
    requests.put(v, sink);
}

public boolean complete(String uuid, T message) {
    Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
    if( sink.isPresent() ){
        sink.get().success(message);
    }
    return sink.isPresent();
}
}

Debugging the second server it never runs the pong method. It seems that the first server does not actually send the inner request message.

What is the correct way to run an inner request-response exchange that completes an outer message exchange with automated reconnection logic?

simbo1905
  • 6,321
  • 5
  • 58
  • 86
  • Your question might be of sufficient complexity that asking directly in https://community.reactive.foundation/ could get an answer sooner. – Yuri Schimke Dec 02 '20 at 07:50

1 Answers1

1

Not sure if I'm missing some of the complexity of your question, but if the middle server is just activing like a proxy I'd start with the simplest case of chaining through the calls. I feel like I'm missing some nuance of the question, so let's work through that next.

  @MessageMapping("runCommand")
  suspend fun runCommandX(
    request: CommandRequest,
  ): Mono<String> {
    val uuid = UUID.randomUUID().toString()

    return requesterMono
      .flatMap { requester: RSocketRequester ->
        requester.route("pong")
          .data("TEST")
          .retrieveMono(String::class.java)
      }
      .doOnSubscribe {
        // register request with uuid
      }
      .doOnSuccess {
        // register completion
      }
      .doOnError {
        // register failure
      }
  }

Generally if you can avoid calling subscribe yourself in typical spring/reactive/rsocket code. You want the framework to do this for you.

Yuri Schimke
  • 12,435
  • 3
  • 35
  • 69
  • thanks yuri i will try that next. may I ask what does the "!!" do as i am not familiar with kotlin? – simbo1905 Dec 03 '20 at 11:14
  • forces a not null check - throws an NPE or continues. otherwise you need a?.b?.c?.d – Yuri Schimke Dec 03 '20 at 12:35
  • sorry for the delay getting back. if i try your approach here https://github.com/simbo1905/spring-rsocket-demo/blob/6f5a6210fa946803073e529a1d101026e6164c20/rsocket-server/src/main/java/io/pivotal/rsocketserver/RSocketController.java#L118 if a set a breakpoint in the flatMap, and the doOnSubscribe, then they do not get called. it appears as through the mono flatmap is lazy and is not getting called? – simbo1905 Dec 11 '20 at 17:55
  • Ahhh, yep, I didn't make it eager. To be honest I almost always consider a Mono that has side effects before being subscribed as probably a bug. It breaks Rx semantics IMHO. – Yuri Schimke Dec 11 '20 at 18:57
  • okay I have a fugly solution involving off loading to an application thread to do the replication work. I will post it here as a signpost to others then follow up on some dedicated forums as you suggest. Thanks! – simbo1905 Dec 12 '20 at 13:53