I am getting started with reactive websockets using Spring Boot 2.1.3. I created a WebSocketHandler
implementation like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return session.send(publisher);
}
This works, if I connect, I get serialized EfficiencyData
every second in my websocket client.
However, I want to react to a request coming from the websocket to tell the service
for what id I want the data. I managed to get the request info like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
return session.textMessage("Subscribing with id " + id);
}));
Now I have no clue how to combine these 2 implementations?
I was hoping to do something like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher; //Does not compile
}));
But that does not compile since publisher
is a Flux<WebSocketMessage>
and it should be a Publisher<WebSocketMessage>
. How should this be handled?
EDIT:
Following the Javadoc example of WebSocketHandler
, I tried this:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux =
session.receive()
.map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
.concatMap(service::subscribeToEfficiencyData);
Mono<Void> input = flux.then();
Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
return Mono.zip(input, output).then();
}
But that just disconnects the websocket client immediately without doing anything.