4

I am using Spring data Redis to consume from a Redis stream ,using the reactive stream receiver to listen over a consumer group works ,but have observed that the Flux stream closes prematurely sometimes and doesn't listen to new messages any more and the flux terminates prematurely .

Code

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder() 
            .build();
 StreamReceiver.create(reactiveConnFactory, options)
            .receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
                        "CONSUMER_STREAM",
                        ReadOffset.lastConsumed()))
            .doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
            .flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
                  .subscribeOn(streamConsumerExecutor))
            .onErrorResume(t -> Flux.empty())
            .doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
            .doOnComplete(() -> {
               LOG.info("Consumer Stream Completed");
            })
            .doOnTerminate(() -> {
               LOG.info("Consumer Stream terminated");
            }) 
            .subscribe();


After some time of reading messages from the stream get the log that the "consumer stream terminated"

version : 2.2.0.RELEASE

Is this a bug or am I missing anything ,Could any one help ?

UPDATE

Looks like redis commands are timing out as I get a RedisCommandTimeoutException, is there a way to retry the streaming process on such errors rather than cancelling it . Also figured out it happens in the XREADGROUP operation though when running through the nodejs redis-cli issuing the same command worked fine?

Gaurav Rawat
  • 1,294
  • 1
  • 25
  • 52

0 Answers0