I'm converting a redis pub/sub system to redis streams, so that I can add some fault tolerance to my server sent events.
Subscribing the traditional way is trivial:
import { createClient } from 'redis';
const redisOptions = {
url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);
redis.on("message", (channel, message) => {
console.log(channel);
console.log(message);
});
redis.subscribe('foo');
This blocks permanently, and keeps the connection open. Publishing to redis will add to your log, in this case.
const json = { a: 1, b: 2 };
redis.publish('foo', JSON.stringify(json));
Switching over to streams, you use XREAD
instead of subscribe, and XADD
instead of publish, and the data is dramatically different. The part I'm struggling with is the blocking.
redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
if (err) return console.error('Error reading from stream:', err);
str.forEach(message => {
console.log(message);
});
}
When sending messages, the first one is picked up by my "subscription", but no further messages are logged.