3

Resolution summary:

In most of the RSocket examples currently out there the server side acceptor is simply constructed as a new object (like new MqttMessageService() below) even in SpringBoot related tutorials. Which is fine if you generate example content right in the acceptor class but might lead to the below dependency injection related confusion when the acceptor depends on other beans in the container.

Original question:

I get a NullPointerException when trying to stream database entries using a Spring Data Reactive Mongodb repository via Rsocket's Java server.

The problem is that during debugging all components work separately: I can get the requested data via the same Mongodb repository and I can also stream random generated data between the same server and client using Rsocket.

So I'm either missing something really basic or there might be an issue with using Reactive Mongodb and Rsocket together.

Here is the original server side Rsocket configuration:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

And here is the working server side Rsocket configuration with proper DI:

@Configuration
public class RsocketConfig {

    @Autowired
    MqttMessageService messageService;

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(messageService))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

Here is the server side AbstractRSocket implementation where a NullPointerException is thrown at return service.findAll().

@Service
public class MqttMessageService extends AbstractRSocket {



    @Autowired 
    private MqttMessageEntityService service;

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return service.findAll()
            .map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

    }
}

Here are the reactive repository and the related service. The service returns null when injected to the server's AbstractRSocket implementation, but works fine when injected into other classes:

@Service
public class MqttMessageEntityService {

    @Autowired
    private MqttMessageEntityRepository repository;

    public Flux<MqttMessageEntity> findAll() {
        return repository.findAll();
    }

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

And here is the client side code that works perfectly with the test contents:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void testRsocket() {

        RSocket rSocketClient = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(8802))
                .start()
                .block();

        rSocketClient
                .requestStream(DefaultPayload.create(""))
                .blockLast();
    }        
}

I might be a little over my knowledge level here and resources are very limited on the topic so I appreciate any hints towards the solution :)

Péter Veres
  • 955
  • 1
  • 10
  • 25

1 Answers1

2

Regarding

@PostConstruct
public void startServer() {
    RSocketFactory.receive()
            .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
            .transport(TcpServerTransport.create(8802))
            .start()
            .block()
            .onClose();
}

Are you using the to keep the server alive? If so add another block after the onClose().

Is messageEntityService null? Because that looks like the only thing that could cause an error if the variables topicStart and module aren't. Especially if the other code works - I can't really see anything that would cause a problem from the RSocket side.

Robert Roeser
  • 221
  • 1
  • 2
  • Oh, indeed you are right! The messageEntityService does not return a null value, the service itself is null Which is unexpected as it works perfectly when injected it to a different class. Do you have any idea while this could be happening? I'm updating my question with more details on the reactive mongo repository and the messageEntityService as it might still be relevant under this question. – Péter Veres Feb 03 '19 at 18:42
  • 1
    I think this thread may help you figure out why Spring is injecting a null in your @Autowired variable: https://stackoverflow.com/questions/19896870/why-is-my-spring-autowired-field-null – Robert Roeser Feb 05 '19 at 22:03
  • Haha, thanks! The first thing that I've checked was if the service is managed by the container, but missed the part where I should have injected it as the server side acceptor, instead of creating a new instance outside of the Spring container. I was a bit confused since all the examples I've seen even the ones involving SpringBoot were using this way of adding the acceptor - but I just realized now that in those cases it didn't really matter if it's maintained by spring since they all used data generated within the acceptor and didn't depend on other services (like the db in my case) – Péter Veres Feb 07 '19 at 14:31