After trying many things:
I ended up just setting a small buffer size on the StreamWriter
using the constructor overload (128bytes which is the smallest amount you can set), then after my message and before the Flush
I just wrote some whitespace.
await client.WriteLineAsync( "".PadLeft( 128 ) );
await client.FlushAsync();
This works locally but also on Azure App Service which is where I was experiencing the problem (it always worked fine locally on IISExpress).
Here is my simple service class to manage this; bear in mind that I wanted to send messages to specific users based on the production of externally generated reports which pinged my asp.net app when complete; hence the use of a Dictionary of stream connections for each user (each user could have multiple tabs open so might have many connections).
One thing I have left to sort out is to keep the connections open as Azure App Service closes any connection after 2 minutes.
/// <summary>
/// Simple writer class to store the valid state of a client stream connection
/// </summary>
public class ClientWriter
{
public ClientWriter( StreamWriter writer )
{
Valid = true;
Writer = writer;
}
public bool Valid { get; set; } = true;
public StreamWriter Writer { get; set; } = null;
}
/// <summary>
/// Manage client messages using Server Sent Event (SSE) Messages protocol
/// </summary>
public class ClientMessageService : IClientMessageService
{
public ClientMessageService( ILogger logger )
{
_jsonSerializer = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Formatting = Formatting.None,
NullValueHandling = NullValueHandling.Ignore,
};
_logger = logger;
_messageEncoding = new UTF8Encoding( false );
}
/// <summary>
/// Subscribe a user session/tab with the system to receive messages
/// </summary>
public HttpResponseMessage SubscribeUser( int userId, HttpRequestMessage request )
{
_logger.Info( $"Subscribing connection for user {userId}" );
var response = request.CreateResponse();
response.Content = new PushStreamContent( ( stream, content, context ) =>
{
OnStreamAvailable( userId, stream );
}, new MediaTypeHeaderValue( "text/event-stream" ) );
_logger.Info( $"Returning connection subscription response for user {userId}" );
return response;
}
/// <summary>
/// Unsubscribe a user and dispose of all open connections
/// </summary>
public void UnSubscribeUser( int userId )
{
if( Subscribers.TryRemove( userId, out var clientConnections ) )
{
foreach( var client in clientConnections )
{
client.Valid = false;
if( client.Writer != null )
{
client.Writer.Dispose();
client.Writer = null;
}
}
}
}
/// <summary>
/// Sends a message to all open sessions for the given user
/// </summary>
public async Task PostMessageAsync( int userId, ClientMessageBase message )
{
_logger.Info( $"Posting message to user {userId}" );
if( Subscribers.TryGetValue( userId, out var clientConnections ) )
{
// Send notifications to all valid client connections
foreach( var client in clientConnections )
{
if( client.Valid )
{
_logger.Info( $"Posting message to user {userId} client for type {message.Type}" );
await WriteEventDataAsync( client, message.Type, JsonConvert.SerializeObject( message, _jsonSerializer ) );
}
}
// Cleanup any invalid/bad client connections
foreach( var client in clientConnections )
{
// Dispose of any invalid/bad connections
if( client.Valid == false && client.Writer != null )
{
client.Writer.Dispose();
client.Writer = null;
}
}
}
}
/// <summary>
/// Send given data message to client connection
/// </summary>
private async Task WriteEventDataAsync( ClientWriter client, string eventType, string data )
{
try
{
if( !string.IsNullOrEmpty( eventType ) )
{
await client.Writer.WriteLineAsync( $"event:{eventType}" );
}
await client.Writer.WriteLineAsync( $"data:{( data ?? "" )}" );
// Must end the data event with a new line to separate events from each other
await client.Writer.WriteLineAsync();
// Because flush doesn't appear to flush the underlying stream as noted
// a lot on StackOverflow and other sites, the best solution is to pad out
// the buffer to force a flush of the actual message data
await client.Writer.WriteLineAsync( "".PadLeft( 128 ) );
await client.Writer.FlushAsync();
}
catch( Exception )
{
// Connection might not exist; tab or browser might have been closed
// So mark it as bad for cleanup later once all notifications have been
// sent
client.Valid = false;
_logger.Error( $"ERROR: Failed to post message for type {eventType}" );
}
}
/// <summary>
/// Create a stream writer connection and store against the user for future
/// </summary>
private void OnStreamAvailable( int userId, Stream stream )
{
_logger.Info( $"Creating connection stream writer for user {userId}" );
var clientWriter = new StreamWriter( stream, _messageEncoding, 128 )
{
// Force to unix endings
NewLine = "\n",
};
var client = new ClientWriter( clientWriter );
if( Subscribers.TryGetValue( userId, out var clientConnections ) )
{
clientConnections.Add( client );
}
else
{
var clients = new ConcurrentBag<ClientWriter> { client };
Subscribers.TryAdd( userId, clients );
}
_logger.Info( $"Created connection stream writer for user {userId}" );
}
private readonly Encoding _messageEncoding;
private readonly JsonSerializerSettings _jsonSerializer;
private static readonly ConcurrentDictionary<int, ConcurrentBag<ClientWriter>> Subscribers = new ConcurrentDictionary<int, ConcurrentBag<ClientWriter>>();
private readonly ILogger _logger;
}