I'm a little confused about the lifecycle of using Observable.Publish for multicast handling. How should one use connect correctly? Against intuition I've found I do not need to call connect for the multicast observers to start their subscriptions.
var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);
// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()
// Does calling
disposable.Dispose();
// unsubscribe field0 and field1?
EDIT
My puzzle was why I was successfully subscribing when I was not calling Connect on the IConnectableObservable explicity. However I was calling Await on the IConnectableObservable which implicitly calls Connect
Public Async Function MonitorMeasurements() As Task
Dim cts = New CancellationTokenSource
Try
Using dialog = New TaskDialog(Of Unit)(cts)
Dim measurementPoints =
MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
TakeUntil(dialog.CancelObserved).Publish()
Dim viewModel = New MeasurementViewModel(measurementPoints)
dialog.Content = New MeasurementControl(viewModel)
dialog.Show()
Await measurementPoints
End Using
Catch ex As TimeoutException
MessageBox.Show(ex.Message)
Catch ex As Exception
MessageBox.Show(ex.Message)
End Try
End Function
Note my TaskDialog exposes an observable called CancelObserved for when the cancel button is pressed.
SOLUTION
The solution is posted in a link by @asti. Here is a quote from the RX team in that link
Notice use of await makes an observable sequence hot by causing a subscription to take place. Included in this release is await support for IConnectableObservable, which causes connecting the sequence to its source as well as subscribing to it. Without the Connect call, the await operation would never complete