12

I'm converting a redis pub/sub system to redis streams, so that I can add some fault tolerance to my server sent events.

Subscribing the traditional way is trivial:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

This blocks permanently, and keeps the connection open. Publishing to redis will add to your log, in this case.

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

Switching over to streams, you use XREAD instead of subscribe, and XADD instead of publish, and the data is dramatically different. The part I'm struggling with is the blocking.

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

When sending messages, the first one is picked up by my "subscription", but no further messages are logged.

Dudo
  • 4,002
  • 8
  • 32
  • 57

1 Answers1

16

Truth be told, I only asked this question because google was no good to me, and I couldn't find anyone else posting about this problem. Hope this helps!

So, XREAD is only blocking on the initial call. It will sit and wait, for a set time period (or indefinitely if you set the time to 0), but once it receives data, its duty is considered fulfilled, and it unblocks. To keep the "subscription" alive, you need to call XREAD again, with the most recent id from the stream. This replaces the initial $ value we passed it.

Recursion seemed like a perfect solution:

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    setTimeout(() => xread({ stream, id }), 0)
  });
}

xread({ stream: 'asdf', id: '$' })

Dudo
  • 4,002
  • 8
  • 32
  • 57
  • 1
    Surprising how there's barely any info about that on the web ... Are you sure recursion is a perfect solution? – W2a Oct 31 '20 at 11:26
  • 5
    With that recursion you will hit maximum call stack depth. Wrap the recursion call with `setTimeout(( ) => xread(stream, id), 0)` to allow the stack to clear – danthegoodman Feb 07 '21 at 13:59
  • @W2a - I have no clue... if you have a better suggestion, I'd love to hear it! – Dudo Feb 08 '21 at 16:34
  • xread() expects the id of the last event you processed. Actually, always passing '$' as the stream id is not a good idea. If messages come through too quickly there is a chance you will miss some of them between calls to xread(), as '$' means 'the most recent ID'. The manual suggests doing the first xread() with '$' as the stream ID, then note the id of the latest of the read events and pass it to the next xread() and so forth. – Mike Mar 13 '23 at 13:09
  • That’s what this does, @Mike. The xread at the bottom of the script will only be called once, then the recursion kicks in. – Dudo Mar 14 '23 at 02:58
  • Right @Dudo. My bad for glancing over the code too quickly!. Let me compensate for it with some useful info: When monitoring more than one stream in the same xread() call, my experience is that using '$' on all of them and replacing them as you get the first message on each stream leads to odd behavior. What worked best for me is to first query the REDIS time with redis.time(), and then use that as a base ID for all of them in the first call. – Mike Mar 15 '23 at 08:37