Use of the Subject
type is often frowned upon in reactive programming. In the following situation, I use a Subject
to allow subscribing to notifications before the underlying source of the notification is created. Is there an alternative to accomplish this without using Subject
?
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using Windows.Networking.Sockets;
using Windows.Storage.Streams;
class Program
{
public static void Main()
{
var socket = new ObservableMessageWebSocket();
socket.Messages.Subscribe(Print); // Caller is allowed to subscribe before connect
var uri = new Uri("ws://mydomain.com/messages");
socket.ConnectAsync(uri).Wait(); // Caller is allowed to connect after subscribe
Console.ReadLine();
}
public static void Print(string message)
{
Console.WriteLine(message);
}
}
class ObservableMessageWebSocket
{
// Is there a way to get rid of this Subject?
private readonly Subject<string> subject = new Subject<string>();
private MessageWebSocket webSocket;
public IObservable<string> Messages => subject;
public async Task ConnectAsync(Uri uri)
{
webSocket = new MessageWebSocket();
webSocket.Control.MessageType = SocketMessageType.Utf8;
Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
.Select(ReadString)
.Subscribe(subject);
await webSocket.ConnectAsync(uri);
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
}
EDIT: To clarify, I have several software components that subscribe to ObservableMessageWebSocket.Messages
for push notifications. Some components subscribe before ObservableMessageWebSocket.ConnectAsync
is called, and some subscribe after.
The code below avoids Subject
, but does not function correctly. A component subscribes after connect, and never receives notifications.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
Messages = Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
.Select(ReadString);
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
The code below does not work either. Same symptom.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
Messages = Observable.Create<string>(o => Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
.Select(ReadString)
.Subscribe(o));
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
Somehow, the code below works.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
private event EventHandler<string> StringReceived;
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
WebSocket.MessageReceived += HandleEvent;
Messages = Observable
.FromEventPattern<string>(this, nameof(StringReceived))
.Select(p => p.EventArgs);
}
private void HandleEvent(MessageWebSocket sender, MessageWebSocketMessageReceivedEventArgs args)
{
var handler = StringReceived;
if (handler == null) return;
string message;
using (var reader = args.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
message= reader.ReadString(reader.UnconsumedBufferLength);
}
handler.Invoke(this, message);
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
To me, all three seem similar. How come only the last one works?