Resolution summary:
In most of the RSocket examples currently out there the server side acceptor is simply constructed as a new object (like new MqttMessageService() below) even in SpringBoot related tutorials. Which is fine if you generate example content right in the acceptor class but might lead to the below dependency injection related confusion when the acceptor depends on other beans in the container.
Original question:
I get a NullPointerException when trying to stream database entries using a Spring Data Reactive Mongodb repository via Rsocket's Java server.
The problem is that during debugging all components work separately: I can get the requested data via the same Mongodb repository and I can also stream random generated data between the same server and client using Rsocket.
So I'm either missing something really basic or there might be an issue with using Reactive Mongodb and Rsocket together.
Here is the original server side Rsocket configuration:
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
And here is the working server side Rsocket configuration with proper DI:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
Here is the server side AbstractRSocket implementation where a NullPointerException is thrown at return service.findAll().
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
Here are the reactive repository and the related service. The service returns null when injected to the server's AbstractRSocket implementation, but works fine when injected into other classes:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
And here is the client side code that works perfectly with the test contents:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
I might be a little over my knowledge level here and resources are very limited on the topic so I appreciate any hints towards the solution :)