33

I am developing real-time client-server application using C# and SignalR. I need to send messages to client as fast as possible. My code on server:

for (int i = 0; i < totalRecords; i++)
{
    hubContext.Clients.Client(clientList[c].Key).addMessage(
    serverId, RecordsList[i].type + RecordsList[i].value);
    Thread.Sleep(50);       
}

If there is delay >=50 ms everything working perfect, but if there is no delay or delay is less then 50 ms some messages are missing. I need to sent messages as fast as possible without delay. I guess I need to check if message received and only after send another one.
How to do it in a right way?

Dmitry Kazakov
  • 1,639
  • 3
  • 27
  • 47

3 Answers3

36

SignalR doesn't guarantee message delivery. Since SignalR doesn't block when you call client methods, you can invoke client methods very quickly as you've discovered. Unfortunately, the client might not always be ready to receive messages immediately once you send them, so SignalR has to buffer messages.

Generally speaking, SignalR will buffer up to 1000 messages per client. Once the client falls behind by over 1000 messages, it will start missing messages. This DefaultMessageBufferSize of 1000 can be increased, but this will increase SignalR's memory usage and it still won't guarantee message delivery.

http://www.asp.net/signalr/overview/signalr-20/performance-and-scaling/signalr-performance#tuning

If you want to guarantee message delivery, you will have to ACK them yourself. You can, as you suggested, only send a message after the previous message has been acknowledged. You can also ACK multiple messages at a time if waiting for an ACK for each message is too slow.

halter73
  • 15,059
  • 3
  • 49
  • 60
  • 1
    I was interested about this topic as well and there are some proposed solutions around stackoverflow... I've posted this user voice to include guaranteed message delivery in the official SignalR docs http://aspnet.uservoice.com/forums/228522-show-me-how-with-code/suggestions/6206997-signalr-guaranteed-message-delivery – CJ Harmath Jul 23 '14 at 15:53
10

You'll want to resend messages until you receive an acknowledgement from the other client.

Instead of immediately sending messages, queue them up and have a background thread/timer send the messages.

Here's a performant queue that would work.

public class MessageQueue : IDisposable
{
    private readonly ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();

    public int InQueue => _messages.Count;

    public int SendInterval { get; }

    private readonly Timer _sendTimer;
    private readonly ISendMessage _messageSender;

    public MessageQueue(ISendMessage messageSender, uint sendInterval) {
        _messageSender = messageSender ?? throw new ArgumentNullException(nameof(messageSender));
        SendInterval = (int)sendInterval;
        _sendTimer = new Timer(timerTick, this, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start() {
        _sendTimer.Change(SendInterval, Timeout.Infinite);
    }

    private readonly ConcurrentQueue<Guid> _recentlyReceived = new ConcurrentQueue<Guid>();

    public void ResponseReceived(Guid id) {
        if (_recentlyReceived.Contains(id)) return; // We've already received a reply for this message

        // Store current message locally
        var message = _currentSendingMessage;

        if (message == null || id != message.MessageId)
            throw new InvalidOperationException($"Received response {id}, but that message hasn't been sent.");

        // Unset to signify that the message has been successfully sent
        _currentSendingMessage = null;

        // We keep id's of recently received messages because it's possible to receive a reply
        // more than once, since we're sending the message more than once.
        _recentlyReceived.Enqueue(id);

        if(_recentlyReceived.Count > 100) {
            _recentlyReceived.TryDequeue(out var _);
        }
    }

    public void Enqueue(Message m) {
        _messages.Enqueue(m);
    }

    // We may access this variable from multiple threads, but there's no need to lock.
    // The worst thing that can happen is we send the message again after we've already
    // received a reply.
    private Message _currentSendingMessage;

    private void timerTick(object state) {
        try {
            var message = _currentSendingMessage;

            // Get next message to send
            if (message == null) {
                _messages.TryDequeue(out message);

                // Store so we don't have to peek the queue and conditionally dequeue
                _currentSendingMessage = message;
            }

            if (message == null) return; // Nothing to send

            // Send Message
            _messageSender.Send(message);
        } finally {
            // Only start the timer again if we're done ticking.
            try {
                _sendTimer.Change(SendInterval, Timeout.Infinite);
            } catch (ObjectDisposedException) {

            }
        }
    }

    public void Dispose() {
        _sendTimer.Dispose();
    }
}

public interface ISendMessage
{
    void Send(Message message);
}

public class Message
{
    public Guid MessageId { get; }

    public string MessageData { get; }

    public Message(string messageData) {
        MessageId = Guid.NewGuid();
        MessageData = messageData ?? throw new ArgumentNullException(nameof(messageData));
    }
}

Here's some example code using the MessageQueue

public class Program
{
    static void Main(string[] args) {
        try {
            const int TotalMessageCount = 1000;

            var messageSender = new SimulatedMessageSender();

            using (var messageQueue = new MessageQueue(messageSender, 10)) {
                messageSender.Initialize(messageQueue);

                for (var i = 0; i < TotalMessageCount; i++) {
                    messageQueue.Enqueue(new Message(i.ToString()));
                }

                var startTime = DateTime.Now;

                Console.WriteLine("Starting message queue");

                messageQueue.Start();

                while (messageQueue.InQueue > 0) {
                    Thread.Yield(); // Want to use Thread.Sleep or Task.Delay in the real world.
                }

                var endTime = DateTime.Now;

                var totalTime = endTime - startTime;

                var messagesPerSecond = TotalMessageCount / totalTime.TotalSeconds;

                Console.WriteLine($"Messages Per Second: {messagesPerSecond:#.##}");
            }
        } catch (Exception ex) {
            Console.Error.WriteLine($"Unhandled Exception: {ex}");
        }

        Console.WriteLine();
        Console.WriteLine("==== Done ====");

        Console.ReadLine();
    }
}

public class SimulatedMessageSender : ISendMessage
{
    private MessageQueue _queue;

    public void Initialize(MessageQueue queue) {
        if (_queue != null) throw new InvalidOperationException("Already initialized.");

        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
    }

    private static readonly Random _random = new Random();

    public void Send(Message message) {
        if (_queue == null) throw new InvalidOperationException("Not initialized");

        var chanceOfFailure = _random.Next(0, 20);

        // Drop 1 out of 20 messages
        // Most connections won't even be this bad.
        if (chanceOfFailure != 0) {
            _queue.ResponseReceived(message.MessageId);
        }
    }
}
Kelly Elton
  • 4,373
  • 10
  • 53
  • 97
  • 1
    Consider using a BlockingCollection, see http://msdn.microsoft.com/en-us/library/dd267312.aspx, no need for manual locking then en much more efficient. It won't guarantee a FIFO order though. – Kris Vandermotten Mar 05 '14 at 19:55
  • Yeah that's kind of the problem. Like I mentioned, I didn't spend time addressing locking issues with this solution. There is probably a decent object in `System.Collections.Concurrent` for it, but that's maybe a bit outside the scope of this. Certainly isn't the end solution anyone should use, but it's a good starting point. – Kelly Elton Mar 05 '14 at 20:32
  • 2
    ConcurrentQueue is easy to use and probably exactly what you need : http://www.devx.com/dotnet/working-with-concurrent-queue-in-c.html – Simon_Weaver Feb 17 '17 at 04:21
  • This is almost 3 years later, but System.Threading.Channels is probably your best option for this. – aweyeahdawg Dec 13 '19 at 18:46
  • @KellyElton sorry, i was referring to the comment conversation about queues. – aweyeahdawg Dec 13 '19 at 21:45
1

Extending given answer, I did the following:

I decided to generate UUID for each message on client side, which sends the message, using one of tested UUID generators in JS.

Then, send this UUID alongside with a message. After the other client receives a message along with UUID, he sends confirmation of delivery back to the sender (confirmation contains said UUID).

After sender receives back his generated message UUID, he is sure, that message was successfully processed.

Also, I block sending messages until confirmation is received.

Michał Turczyn
  • 32,028
  • 14
  • 47
  • 69