1

Use of the Subject type is often frowned upon in reactive programming. In the following situation, I use a Subject to allow subscribing to notifications before the underlying source of the notification is created. Is there an alternative to accomplish this without using Subject?

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using Windows.Networking.Sockets;
using Windows.Storage.Streams;

class Program
{
    public static void Main()
    {
        var socket = new ObservableMessageWebSocket();

        socket.Messages.Subscribe(Print); // Caller is allowed to subscribe before connect

        var uri = new Uri("ws://mydomain.com/messages");
        socket.ConnectAsync(uri).Wait(); // Caller is allowed to connect after subscribe

        Console.ReadLine();
    }

    public static void Print(string message)
    {
        Console.WriteLine(message);
    }
}

class ObservableMessageWebSocket
{
    // Is there a way to get rid of this Subject?
    private readonly Subject<string> subject = new Subject<string>();

    private MessageWebSocket webSocket;

    public IObservable<string> Messages => subject;

    public async Task ConnectAsync(Uri uri)
    {
        webSocket = new MessageWebSocket();

        webSocket.Control.MessageType = SocketMessageType.Utf8;

        Observable
            .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
            .Select(ReadString)
            .Subscribe(subject);

        await webSocket.ConnectAsync(uri);
    }

    private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
    {
        using (var reader = pattern.EventArgs.GetDataReader())
        {
            reader.UnicodeEncoding = UnicodeEncoding.Utf8;

            return reader.ReadString(reader.UnconsumedBufferLength);
        }
    }
}

EDIT: To clarify, I have several software components that subscribe to ObservableMessageWebSocket.Messages for push notifications. Some components subscribe before ObservableMessageWebSocket.ConnectAsync is called, and some subscribe after.

The code below avoids Subject, but does not function correctly. A component subscribes after connect, and never receives notifications.

class ObservableMessageWebSocket
{
    private MessageWebSocket WebSocket { get; }

    public IObservable<string> Messages { get; }

    public ObservableMessageWebSocket()
    {
        WebSocket = new MessageWebSocket();
        WebSocket.Control.MessageType = SocketMessageType.Utf8;
        Messages = Observable
            .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
            .Select(ReadString);
    }

    private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
    {
        using (var reader = pattern.EventArgs.GetDataReader())
        {
            reader.UnicodeEncoding = UnicodeEncoding.Utf8;

            return reader.ReadString(reader.UnconsumedBufferLength);
        }
    }

    public async Task ConnectAsync(Uri uri)
    {
        await WebSocket.ConnectAsync(uri);
    }
}

The code below does not work either. Same symptom.

class ObservableMessageWebSocket
{
    private MessageWebSocket WebSocket { get; }

    public IObservable<string> Messages { get; }

    public ObservableMessageWebSocket()
    {
        WebSocket = new MessageWebSocket();
        WebSocket.Control.MessageType = SocketMessageType.Utf8;
        Messages = Observable.Create<string>(o => Observable
            .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
            .Select(ReadString)
            .Subscribe(o));
    }

    private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
    {
        using (var reader = pattern.EventArgs.GetDataReader())
        {
            reader.UnicodeEncoding = UnicodeEncoding.Utf8;

            return reader.ReadString(reader.UnconsumedBufferLength);
        }
    }

    public async Task ConnectAsync(Uri uri)
    {
        await WebSocket.ConnectAsync(uri);
    }
}

Somehow, the code below works.

class ObservableMessageWebSocket
{
    private MessageWebSocket WebSocket { get; }

    private event EventHandler<string> StringReceived;

    public IObservable<string> Messages { get; }

    public ObservableMessageWebSocket()
    {
        WebSocket = new MessageWebSocket();
        WebSocket.Control.MessageType = SocketMessageType.Utf8;
        WebSocket.MessageReceived += HandleEvent;
        Messages = Observable
            .FromEventPattern<string>(this, nameof(StringReceived))
            .Select(p => p.EventArgs);
    }

    private void HandleEvent(MessageWebSocket sender, MessageWebSocketMessageReceivedEventArgs args)
    {
        var handler = StringReceived;
        if (handler == null) return;
        string message;
        using (var reader = args.GetDataReader())
        {
            reader.UnicodeEncoding = UnicodeEncoding.Utf8;
            message= reader.ReadString(reader.UnconsumedBufferLength);
        }
        handler.Invoke(this, message);
    }

    public async Task ConnectAsync(Uri uri)
    {
        await WebSocket.ConnectAsync(uri);
    }
}

To me, all three seem similar. How come only the last one works?

hwaien
  • 493
  • 1
  • 3
  • 14
  • With the updated code are you sure that you're actually subscribing to the observable in your test code? Also, try the `Observable.FromEventPattern(h => xxx.Event += h, h => xxx.Event -= h)` syntax for observing the event. – Enigmativity Sep 01 '17 at 06:55
  • Great suggestion. After switching to your suggested syntax, I get an exception when `WebSocket.MessageReceived += h` is called. Apparently `Observable.FromEventPattern(Object, string)` swallows exceptions. – hwaien Sep 01 '17 at 17:43
  • I guess the lesson learned is to always avoid `Observable.FromEventPattern(Object, string)` and use something like `FromEventPattern(Action>, Action>)` instead? The exception swallowing behavior wasted me quite a bit of time to figure out what's wrong. – hwaien Sep 01 '17 at 17:47
  • I don't know if it swallows exceptions - It's just not statically typed and easy to get wrong. I always use `FromEventPattern(Action>, Action>)` for the reason that it is statically checked. – Enigmativity Sep 02 '17 at 00:06
  • I will be following the same practice going forward. Thank you for your help! – hwaien Sep 07 '17 at 17:49

2 Answers2

4

It's usually a good idea to avoid subjects. In your code you're exposing the subject directly to the calling code. Any consumer that does ((Subject<string>)socket.Messages).OnCompleted(); will stop your code working.

You're also newing up a WebSocket which should be disposed of afterwards.

There is a way to get ride of the subject and make it all behave a lot better.

Try this:

public IObservable<string> Connect(Uri uri)
{
    return
        Observable
            .Using(
                () =>
                {
                    var webSocket = new MessageWebSocket();
                    webSocket.Control.MessageType = SocketMessageType.Utf8;
                    return webSocket;
                },
                webSocket =>
                    Observable
                        .FromAsync(() => webSocket.ConnectAsync(uri))
                        .SelectMany(u =>
                            Observable
                                .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
                                .SelectMany(pattern =>
                                    Observable
                                        .Using(
                                            () =>
                                            {
                                                var reader = pattern.EventArgs.GetDataReader();
                                                reader.UnicodeEncoding = UnicodeEncoding.UTF8;
                                                return reader;
                                            },
                                            reader => Observable.Return(reader.ReadString(reader.UnconsumedBufferLength))))));

}

Here's how to avoid the subject with your existing code style:

public IObservable<string> ConnectAsync(Uri uri)
{
    return
        Observable
            .Create<string>(async o =>
            {
                var webSocket = new MessageWebSocket();

                webSocket.Control.MessageType = SocketMessageType.Utf8;

                var subscription = Observable
                    .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
                    .Select(ReadString)
                    .Subscribe(o);

                await webSocket.ConnectAsync(uri);

                return subscription;
            });
}

Here's a quick test that this works:

void Main()
{
    Connect(new Uri("https://stackoverflow.com/")).Subscribe(x => Console.WriteLine(x.Substring(0, 24)));
}

public IObservable<string> Connect(Uri uri)
{
    return
        Observable
            .Create<string>(async o =>
            {
                var webClient = new WebClient();

                webClient.UseDefaultCredentials = true;

                var subscription =
                    Observable
                        .Using(
                            () => new CompositeDisposable(webClient, Disposable.Create(() => Console.WriteLine("Disposed!"))),
                            _ =>
                                Observable
                                    .FromEventPattern<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
                                        h => webClient.DownloadStringCompleted += h, h => webClient.DownloadStringCompleted -= h)
                                    .Take(1))
                    .Select(x => x.EventArgs.Result)
                    .Subscribe(o);

                await webClient.DownloadStringTaskAsync(uri);

                return subscription;
            });
}

Note that "Disposed!" is displayed showing that the WebClient is disposed.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thanks for your answer. Your comment about `IDisposable` is right; In production code I do have disposal implemented, but on StackOverflow I removed those lines to emphasize the main point of my question, which is about how to allow subscribing to notifications before the underlying source of the notification is created. – hwaien Aug 31 '17 at 16:59
  • @hwaien - If you want to "allow subscribing to notifications before the underlying source of the notification is created" then you should use `Observable.Create` - that's what it's there for. It then avoids the `Subject`. I'll try to add that to my answer later. – Enigmativity Aug 31 '17 at 22:28
  • @hwaien - I've added an answer to my code that is more aligned with what you want. – Enigmativity Sep 01 '17 at 01:21
  • Thank you for your suggestion. I did not know about `Observable.Create`. However, after putting it to use, I find that it does not give me the results I expect. I've updated my question to include my attempt to use `Observable.Create`. – hwaien Sep 01 '17 at 06:23
  • @hwaien - I think there might be something else going on now. Can you read my comment on your question? – Enigmativity Sep 01 '17 at 07:01
  • Marking this as answer since it gives a solution to the original question of how to get rid of `Subject`. – hwaien Sep 07 '17 at 17:33
1

Subjects aren't universally bad, and I don't think there's anything terribly wrong with the way you used it. I would refer to Why are Subjects not recommended in .NET Reactive Extensions? and RX Subjects - are they to be avoided? for some reasonable discussion about them and their use.

Given that, I would suggest the following (while removing all of the fields and exposed properties):

public async Task<IObservable<string>> ConnectAsync(Uri uri)
{
    webSocket = new MessageWebSocket();

    webSocket.Control.MessageType = SocketMessageType.Utf8;

    var toReturn = Observable
        .FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
        .Select(ReadString);

    await webSocket.ConnectAsync(uri);
    return toReturn;
}

This way if someone calls ConnectAsync twice, they can get separate observables.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Thanks for your answer. The linked discussions are insightful. And your comment about multiple calls to `ConnectAsync` is right; In production code I do have logic to handle multiple calls, but on StackOverflow I removed those lines to emphasize the main point of my question. – hwaien Aug 31 '17 at 16:57
  • Just a quick one, @Shlomo. The return type on this function is `Task>`, but your code appears to be returning a `Task`. Am I reading that wrong? – Enigmativity Sep 01 '17 at 01:19
  • Nope, you read right, I wrote wrong. Edited and corrected. Thank you. – Shlomo Sep 01 '17 at 03:29