2

I recently ran into a problem with trying to broadcast to an open WebSocket using the SendAsync method, receiving an InvalidOperationException with the message "A send operation is already in progress"

Digging through the source code for the WebSocket class, a busy state is tracked internally:

internal ChannelState _sendState;

// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
    Ready, // this channel is available for transmitting new frames
    Busy, // the channel is already busy transmitting frames
    Closed // this channel has been closed
}

Ideally, if I'm about to broadcast an update to a WebSocket connection, I'd like to know ahead of time that it's busy and perform some other handling (e.g. queue the message).

It seems odd that this state is marked as internal -- were it a public property I could simply check for

context.WebSocket.SendState == ChannelState.Ready

What is the correct pattern to SendAsync on a WebSocket that would prevent throwing this exception?

I'm loath to hack access to that property through reflection.

Edit to clarify:

The WebSocket.State property will not help this situation. The property uses this enumeration:

public enum WebSocketState
{
    None,
    Connecting,
    Open,
    CloseSent,
    CloseReceived,
    Closed,
    Aborted
}

Once the socket connection has been opened, this statement will evaluate to "true" regardless of whether or not it is busy sending:

context.WebSocket.State == WebSocketState.Open
Jake McGraw
  • 135
  • 4
  • 11
  • Are you using an `AspNetWebSocket`? If so, it has a `State` property you can check. https://msdn.microsoft.com/en-us/library/system.web.websockets.aspnetwebsocket.state(v=vs.110).aspx – Dan Harms Apr 26 '16 at 23:06
  • Unfortunately the State property doesn't change when the socket is "Busy" -- it remains "Open". Trying to initiate a "SendAsync" while it is busy will throw an exception. – Jake McGraw Apr 27 '16 at 01:24
  • Perhaps modify your code to have only one thread doing the send operations. That process can work against a queue that your application populates when it wants to send a message. – Dan Harms Apr 27 '16 at 14:58

1 Answers1

1

I've implemented a solution which appears to be working for my needs.

The basic problem is when two threads are trying to send on the same WebSocket.

My solution has a few parts, and is dependent upon the fact that this is running under the AspNetWebSocketContect, so I can use the "Items" dictionary to store properties about the current connection.

  1. A "sending" property is used to track if the WebSocket is busy.
  2. A "queue" property is a list of ArraySegments to send.
  3. Lock the WebSocket object and run the Send method synchronously.
  4. Handle any items in the queue once the send is finished.
  5. Yield at the start of the method to prevent blocking.

This is the code I'm currently using in a dev environment -- I'll monitor to see how well it scales:

/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
    // Return control to the calling method immediately.
    await Task.Yield();

    // Make sure we have data.
    if (buffer.Count == 0)
        return;

    // The state of the connection is contained in the context Items dictionary.
    bool sending;
    lock (context)
    {
        // Are we already in the middle of a send?
        bool.TryParse(context.Items["sending"]?.ToString(), out sending);

        // If not, we are now.
        if (!sending)
            context.Items["sending"] = true;
    }

    if (!sending)
    {
        // Lock with a timeout, just in case.
        if (!Monitor.TryEnter(context.WebSocket, 1000))
        {
            // If we couldn't obtain exclusive access to the socket in one second, something is wrong.
            await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
            return;
        }

        try
        {
            // Send the message synchronously.
            var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
            t.Wait();
        }
        finally
        {
            Monitor.Exit(context.WebSocket);
        }

        // Note that we've finished sending.
        lock (context)
        {
            context.Items["sending"] = false;
        }

        // Handle any queued messages.
        await HandleQueue(context);
    }
    else
    {
        // Add the message to the queue.
        lock (context)
        {
            var queue = context.Items["queue"] as List<ArraySegment<byte>>;
            if (queue == null)
                context.Items["queue"] = queue = new List<ArraySegment<byte>>();
            queue.Add(buffer);
        }
    }
}

/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
    var buffer = new ArraySegment<byte>();
    lock (context)
    {
        // Check for an item in the queue.
        var queue = context.Items["queue"] as List<ArraySegment<byte>>;
        if (queue != null && queue.Count > 0)
        {
            // Pull it off the top.
            buffer = queue[0];
            queue.RemoveAt(0);
        }
    }

    // Send that message.
    if (buffer.Count > 0)
        await SendMessage(context, buffer);
}

A few considerations I have on this approach:

  1. I believe it's better to lock on a simple object instead of a more complex object as I'm doing above. I'm not sure what the ramifications are of using the "context" and "context.WebSocket" objects for locking.
  2. Theoretically I shouldn't need to lock the WebSocket since I'm already testing the "sending" property. However, testing resulted in the WebSocket becoming nonresponsive under a few seconds of heavy load. This went away once I implemented the locking pattern.
  3. I did a test with 10 concurrent threads (sending 1000 messages each) through the same WebSocket connection. Each message was numbered so I could log them with the client, and every one made it through. So the queue system seems to work.
Yasser Jarouf
  • 326
  • 5
  • 18
Jake McGraw
  • 135
  • 4
  • 11