3

I am getting started with reactive websockets using Spring Boot 2.1.3. I created a WebSocketHandler implementation like this:

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
    var publisher = flux.map( o -> {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }).map(session::textMessage)
      .delayElements(Duration.ofSeconds(1));
    return session.send(publisher);
}

This works, if I connect, I get serialized EfficiencyData every second in my websocket client.

However, I want to react to a request coming from the websocket to tell the service for what id I want the data. I managed to get the request info like this:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        return session.textMessage("Subscribing with id " + id);
    }));

Now I have no clue how to combine these 2 implementations?

I was hoping to do something like this:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
        var publisher = flux.map( o -> {
            try {
                return objectMapper.writeValueAsString(o);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).map(session::textMessage)
                            .delayElements(Duration.ofSeconds(1));
        return publisher; //Does not compile
    }));

But that does not compile since publisher is a Flux<WebSocketMessage> and it should be a Publisher<WebSocketMessage>. How should this be handled?

EDIT:

Following the Javadoc example of WebSocketHandler, I tried this:

@Override
public Mono<Void> handle(WebSocketSession session) {
    Flux<EfficiencyData> flux =
            session.receive()
                   .map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
                   .concatMap(service::subscribeToEfficiencyData);
    Mono<Void> input = flux.then();
    Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
    return Mono.zip(input, output).then();
}

But that just disconnects the websocket client immediately without doing anything.

Oleh Dokuka
  • 11,613
  • 5
  • 40
  • 65
Wim Deblauwe
  • 25,113
  • 20
  • 133
  • 211
  • Possible duplicate of [Webflux websocketclient, How to send multiple requests in same session\[design client library\]](https://stackoverflow.com/questions/53812515/webflux-websocketclient-how-to-send-multiple-requests-in-same-sessiondesign-cl) – Oleh Dokuka Mar 06 '19 at 10:20
  • @OlehDokuka I saw that linked question before, but I fail to see how that helps here. – Wim Deblauwe Mar 06 '19 at 10:30
  • right. I see the problem – Oleh Dokuka Mar 06 '19 at 10:41

1 Answers1

1

Use flatMap or concatMap in order to flatten returned publisher

To fix your issue you have to use operators that allows flatting of the returned value. For example

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();
                            return; // null is prohibited in reactive-streams
                         }
                      })
                      .map(session::textMessage)
                      .delayElements(Duration.ofSeconds(1));

                  return publisher;
              })
    );
}

Key takeaways

  1. If the return type is a stream, use flatMap or concatMap (see the difference here
  2. Never returns Null. In reactive-streams Null is prohibited value (see specification rules here
  3. When mapping can end up with null -> use handle operator. See more expalation here
Oleh Dokuka
  • 11,613
  • 5
  • 40
  • 65