1

I am using Stackexchange.Redis package in .NET Core 3.1 I can use pub-sub method like this:-

            var channel = connectionMultiplexer.GetSubscriber().Subscribe("channel1");
             channel.OnMessage(msg =>
             {
                 var message = msg.Message;
                 Console.Out.WriteLine(message);
             });

But pub-sub is not stored anywhere in the Redis server. I found redis stream persists in the redis store. So I wanted to replace pub-sub with the stream. But I don't see any way to implement this as I did for pub-sub. How can I implement pub-sub like way for stream data?

jhon
  • 101
  • 11

2 Answers2

0

So using the Stackexchange.Redis package, I have also searched and could not find an answer, however, I came across this question and in the question the user refers to this github issue . In the Github issue, chrisckc proposes a slight work around that uses both the Streams as well as Pub/Sub to achieve an almost Pub/Sub for streams. I encourage you to read the Github issue for some of the cons. So this is my take on how to implement it using chrisckc suggestion

var db = connectionMultiplexer.GetDatabase();
var subscriber = connectionMultiplexer.GetSubscriber();

/////////////The Subscriber
string channelAndStreamName = "channel1";
var channel = connectionMultiplexer.GetSubscriber().Subscribe(channelAndStreamName);
channel.OnMessage(msg =>
{
    bool ifAllSubscriberShouldProcessAMessage = true;

    if (ifAllSubscriberShouldProcessAMessage)
    {
        //You then can retrieve your message normally
        var message = msg.Message;
        Console.Out.WriteLine(message);

        //Or you can even retrieve it from the actual stream. 
        var streamEntryArray = db.StreamRead(channelAndStreamName, "0-0");

        //Then Parse your streamEntryArray objects To what you expect
        //See example https://github.com/redis-developer/redis-streams-with-dotnet/blob/main/RedisStreamsStackExchange/Program.cs
    }
    else
    {

        //In this case you would only want one consumer to process the message,
        //e.g. if you have multiple instances of your app running for load balancing purposes
        //Firstly follow https://developer.redis.com/develop/dotnet/streams/stream-basics/
        //So that you understand the groups and how to set it up
        //Then instead of having a continuous loop as the article suggests, you instead
        //subscribe using the example I have shown here, but your subscribers should only process
        //the message if they are the ones who received it via the StreamRead, just like below.
        //Remember that all subscribers will receive the message via Pub/Sub, but in your
        //processing logic, retrieve the message via streams, if the message is there
        //then that instance is the one meant to process the actual message

        string groupName = "group1";
        var result = db.StreamReadGroup(channelAndStreamName, groupName, "avg-1", ">", 1);

        if (result.Any())
        {
            //If there is a result, then this subscriber instance is the one in the group who
            //received the message, in theory, the others did not

            var id = result.First().Id;
            db.StreamAcknowledge(channelAndStreamName, groupName, id);

            var message = msg.Message;
            Console.Out.WriteLine(message);
        }
    }
});



/////////////The Publisher

//This is the value that we are going to first add to the stream
//then after we add, we also publish the same value so that it is received by subscribers.
//For easy use, the stream name as well as the channel name are the same
string valueToAdd = $"This is the new entry {DateTime.UtcNow}";

//Adding to persistent stream first
await db.StreamAddAsync(channelAndStreamName,
           new NameValueEntry[]
               {new("temp", valueToAdd), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});

//after adding the new value to the stream, you can then publish to subscribers
await subscriber.PublishAsync(channelAndStreamName, valueToAdd);
kudzanayi
  • 61
  • 5
-1

Try with:

connectionMultiplexer.GetSubscriber().Subscribe("channel", (channel, message) => {
    await DoSomethingWith(message);
});
thepirat000
  • 12,362
  • 4
  • 46
  • 72
  • it doesn't trigger when I execute "XADD channel * message "hello channel" – jhon May 20 '22 at 19:50
  • I thought you were talking about PubSub, not Streams. In that case check [this](https://developer.redis.com/develop/dotnet/streams/stream-basics/) – thepirat000 May 21 '22 at 00:15