3

I'm a little confused about the lifecycle of using Observable.Publish for multicast handling. How should one use connect correctly? Against intuition I've found I do not need to call connect for the multicast observers to start their subscriptions.

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?

EDIT

My puzzle was why I was successfully subscribing when I was not calling Connect on the IConnectableObservable explicity. However I was calling Await on the IConnectableObservable which implicitly calls Connect

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As TimeoutException
        MessageBox.Show(ex.Message)
    Catch ex As Exception
        MessageBox.Show(ex.Message)
    End Try

End Function

Note my TaskDialog exposes an observable called CancelObserved for when the cancel button is pressed.

SOLUTION

The solution is posted in a link by @asti. Here is a quote from the RX team in that link

Notice use of await makes an observable sequence hot by causing a subscription to take place. Included in this release is await support for IConnectableObservable, which causes connecting the sequence to its source as well as subscribing to it. Without the Connect call, the await operation would never complete

bradgonesurfing
  • 30,949
  • 17
  • 114
  • 217

2 Answers2

6

Publish on a source returns an IConnectableObservable<T> which is essentially IObservable<T> with a Connectmethod . You can use Connect and the IDisposable it returns to control the subscription to the source.

Rx is designed to be a fire and forget system. Subscriptions won't be terminated until you explicitly dispose of them, or they complete/error.

i.e., disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) - the subscriptions won't be terminated until disp0, disp1 are explicitly disposed - which is independent of the connection to the multicast source.

You can connect and disconnect without disturbing the pipeline below. An easier way to not worry about manually managing the connection is to using .Publish().RefCount() which will maintain a connection as long as at least one observer is still subscribed to it. This is known as warming up an observable.


UPDATED FOR AN EDIT IN THE QUESTION

OP was calling await on the IConnectableObservable<T>.

From Release notes for Rx:

..the use of await makes an observable sequence hot by causing a subscription to take place. Included in this release is await support for IConnectableObservable, which causes connecting the sequence to its source as well as subscribing to it. Without the Connect call, the await operation would never complete.

Example (taken from the same page)

static async  void Foo()
{
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine("Operation started!");
        return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
    });

    var ys = xs.Publish();

    // This doesn't trigger a connection with the source yet.
    ys.Subscribe(x => Console.WriteLine("Value = " + x));

    // During the asynchronous sleep, nothing will be printed.
    await Task.Delay(5000);

    // Awaiting causes the connection to be made. Values will be printed now,
    // and the code below will return 9 after 10 seconds.
    var y =  await ys;
    Console.WriteLine("Await result = " + y);
}
Asti
  • 12,447
  • 29
  • 38
  • My puzzle is that I am only calling .Publish() without calling connect on the result and it is still working. – bradgonesurfing Oct 22 '12 at 09:19
  • Aha!! I'm calling await on the IConnectableObservable to wait for the sequence to finish. I guess that is implicitly calling Connect :) – bradgonesurfing Oct 22 '12 at 09:20
  • 1
    @bradgonesurfing Wow. I did not not see that coming. Here's the announcement: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/23062737-e154-41af-99f6-45d819992254/ See the second post. – Asti Oct 22 '12 at 09:30
  • :) """Notice use of await makes an observable sequence hot by causing a subscription to take place. Included in this release is await support for IConnectableObservable, which causes connecting the sequence to its source as well as subscribing to it. Without the Connect call, the await operation would never complete.""" – bradgonesurfing Oct 22 '12 at 09:43
  • After your link and reading about "Await anywhere" I had a kind of epiphany. See http://stackoverflow.com/questions/13012867/has-this-usage-of-async-await-in-c-sharp-been-discovered-before – bradgonesurfing Oct 22 '12 at 13:51
  • 1
    @bradgonesurfing To answer your question - F#'s computation expressions is expression rewriting for monadic structures, while async is rewriting user code as statemachines - it's not quite as powerful. Trying to do it in C# will leave you fighting against the language. – Asti Oct 23 '12 at 09:03
5

Publish allows you to share a subscription. This obviously is most useful for making a Cold observable sequence Hot. i.e. Taking a sequence that causes some subscription side effect (a connection to a network perhaps) to occur and ensuring that the side effect is performed once and the results of the sequence shared among the consumers.

In practice, you call publish on your cold sequence, subscribe your consumers and then Connect the published sequence after the subscriptions to mitigate any race conditions.

So basically, what you have done above.

It is largely pointless for already Hot sequences e.g. Subjects, FromEventPattern or something that is already published and connected.

Disposing of the value from the Connect() method will 'disconnect' the sequence preventing the consumers from getting any more values. You can also dispose of the consumer subscriptions if any of these want to detach earlier.

Having said all of this, you appear to be doing the correct thing. What is the problem you are seeing? I am assuming that you are connecting to an already HOT sequence.

casperOne
  • 73,706
  • 19
  • 184
  • 253
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • The connection is not "hot" Upon subscription a UDP connection will be made and upon unsubscribe the UDP connection should be closed. However parsing the UDP frame results in several fields per frame. Each field needs to be set up as an observable as they have different client observers. If I don't call Publish then multiple connections to the same socket get created which is an error of course. Calling Publish gives me the correct result but I don't seem to have to call Connect which is a puzzle. – bradgonesurfing Oct 22 '12 at 09:17
  • Ahh... potentially you have a naughty method? It may be the case where you have a method that returns an IObservable that is eager (not lazy). This is most often the case when connecting to a messaging layer an you are not wrapping your code in Observable.Create(...) – Lee Campbell Oct 22 '12 at 09:47
  • No see the update to the question where I post the solution via @asti. Calling await on an ICOnnectableObservable implicitly calls connect – bradgonesurfing Oct 22 '12 at 09:50
  • Ignore that.. I see you have your answer now. Yes, you would hope that Await connects the observable, else how would it get connected? – Lee Campbell Oct 22 '12 at 09:51