0

After some testing with both pub/sub and xadd/xread I have came to a situation where I have realized that if my subscriber is not on, the message will not be recieved whenever I start up the subscriber. e.g. situation

  1. You send a message via publish
  2. You turn on your subscriber and listen for the channel 10 seconds after you have send the message via publish
  3. The message will be lost.

There is two different codes that I have tried e.g.

Sub.py

import redis
import time
from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)

while True:
    test = client.xread({"sns": '$'}, None, 0)
    print(test)
    time.sleep(1)

Pub.py

import redis

from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)


test = client.xadd("sns", {"status": "kill", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"})
print(test)

Sub.py

EVENT_LISTENER.subscribe("sns")

while True:
    message = EVENT_LISTENER.get_message()

    if message and not message['data'] == 1:
        message = json.loads(message['data'])

Pub.py

import redis

from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)

channel = "sns"
client.publish(channel,
               '{"status": "kill", "store": "sns", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"}')

and it seems like there is no persist historical messages saved in the redis.

My question is, how am I able to read the messages that I have publish and remove after a read when I have turned on my subcriber?

PythonNewbie
  • 1,031
  • 1
  • 15
  • 33

1 Answers1

1

Pub/sub never persists the messages. See What are the main differences between Redis Pub/Sub and Redis Stream?

Streams do persist the message, see https://redis.io/commands/xread

The problem is you are using xread with the special $ id, it only brings messages added after you call.

When blocking sometimes we want to receive just entries that are added to the stream via XADD starting from the moment we block. In such a case we are not interested in the history of already added entries. For this use case, we would have to check the stream top element ID, and use such ID in the XREAD command line. This is not clean and requires to call other commands, so instead it is possible to use the special $ ID to signal the stream that we want only the new things.

You may want to try with 0 on your first call, then use the last message ID.

If you want to avoid starting from zero in case of failure and you cannot persist the last message ID in your client, learn about https://redis.io/topics/streams-intro#consumer-groups

LeoMurillo
  • 6,048
  • 1
  • 19
  • 34
  • I see! make sense I do realized too that using Streams might be too overcomplicated too where I could just use lpush and rpop instead which should do the same thing. But I appreciate your answer! – PythonNewbie Aug 08 '21 at 08:31
  • 1
    That's right! Lists might as well do the trick – LeoMurillo Aug 08 '21 at 17:27