I am using Spring data Redis to consume from a Redis stream ,using the reactive stream receiver to listen over a consumer group works ,but have observed that the Flux stream closes prematurely sometimes and doesn't listen to new messages any more and the flux terminates prematurely .
Code
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder()
.build();
StreamReceiver.create(reactiveConnFactory, options)
.receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
"CONSUMER_STREAM",
ReadOffset.lastConsumed()))
.doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
.flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
.subscribeOn(streamConsumerExecutor))
.onErrorResume(t -> Flux.empty())
.doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
.doOnComplete(() -> {
LOG.info("Consumer Stream Completed");
})
.doOnTerminate(() -> {
LOG.info("Consumer Stream terminated");
})
.subscribe();
After some time of reading messages from the stream get the log that the "consumer stream terminated"
version : 2.2.0.RELEASE
Is this a bug or am I missing anything ,Could any one help ?
UPDATE
Looks like redis commands are timing out as I get a RedisCommandTimeoutException, is there a way to retry the streaming process on such errors rather than cancelling it . Also figured out it happens in the XREADGROUP operation though when running through the nodejs redis-cli issuing the same command worked fine?