Ok I am new with RSocket. I am trying to create a simple RSocket client and simple RSocket server. From the research that I have done it says that RSocket supports resumption:
It is particularly useful as, when sending a RESUME frame containing information about the last received frame, the client is able to resume the connection and only request the data it hasn’t already received, avoiding unnecessary load on the server and wasting time trying to retrieve data that was already retrieved.
It also says that the client is the one responsible for enabling resumption. My question is how to enable this resumption and how to send that RESUME frame. I have functional client and server but if I turn off the server and start it again nothing is happening and later when the client is trying again to communicate with the server it throws: java.nio.channels.ClosedChannelException.
This is my client configuration:
@Configuration
public class ClientConfiguration {
/**
* Defining the RSocket client to use tcp transport on port 7000
*/
@Bean
public RSocket rSocket() {
return RSocketFactory
.connect()
.resumeSessionDuration(Duration.ofDays(10))
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
/**
* RSocketRequester bean which is a wrapper around RSocket
* and it is used to communicate with the RSocket server
*/
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
And this is a RestController from which I am starting the communication with the rsocket server:
@RestController
public class UserDataRestController {
private final RSocketRequester rSocketRequester;
public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}
@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
return rSocketRequester
.route("feedPersonData")
.data(new PersonDataRequest(firstName))
.retrieveFlux(Person.class);
}
}