7

I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don't think a while loop is required. But the following code seems to not work.

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn't the callback supposed to be called for every message that is newly added into the stream?

falcon
  • 1,332
  • 20
  • 39

4 Answers4

2

I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same Flux like below and it worked for me with xread. I think the same should work for xreadgroup as well.

RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
                .doOnNext(msg -> {
                    sink.tryEmitNext(msg.getBody().get("key"));
                })
                .repeat()
                .subscribe();
Lakmal
  • 247
  • 1
  • 9
  • 24
0

I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.

exception in thread "main" java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at com.test.TestList.main(TestList.java:57)
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
    at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
    at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
    at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
    at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

the example code is following:

package com.test;

import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

import java.util.List;
public class TestList {
    public static void main(String[] args) throws Exception {
        RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        RedisAsyncCommands commands = connection.async();
        RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
        String redisFutureGet = redisFuture.get();
        System.out.println(redisFutureGet);
        commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
        RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
                StreamOffset.lastConsumed("my-stream1"));
        List<StreamMessage<String, String>> res = messages.get();
        System.out.println(res);
    }
}
sk l
  • 81
  • 1
  • 5
  • 2
    I have created the group already using redis-cli. But my question is not about that. I do not want to just read whatever is in the stream and immediately quit(like your code here). I want to keep the program running and keep consuming the messages as and when someone adds data into the stream. Kind of like a queue. But without using an infinite loop(that kind of beats the purpose of async push model) – falcon Jul 27 '20 at 14:38
  • I think you can not use RedisFuture to consume the message when someone add data into the stream.You could try the react command. – sk l Jul 28 '20 at 00:30
  • I actually have the same issue and it seems not possible to continuosly consume from redis stream in async fashion. Lettuce docs are not quite clear on this. A bit disappointing – Ciccio Jan 25 '21 at 20:47
0

I think Lettuce is only response for communicating with Redis,wether in sync,async or stream way。it is a low-level library。 so if you want such high-level function,using spinrg-data something like this:

StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener();

   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
                containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("key2"), streamListener);
container.start();
//----------------------------------------------------------------

public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        System.out.println("MessageId: " + message.getId());
        System.out.println("Stream: " + message.getStream());
        System.out.println("Body: " + message.getValue());
    }
}
-1

You could use the Redis reactive commands to achieve this:

RedisReactiveCommands<String, String> commands = connection.reactive();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
    .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
    .subscribe(System.out::println, Throwable::printStackTrace);
David Buck
  • 3,752
  • 35
  • 31
  • 35
rdo82
  • 1