I'm writing up a WebSocket client which hooks up to a server, and exposes a stream of values as IObservable<> .
In case consumers unsubscribe from the observable (before it terminates naturally), I'd like to clean up properly and close to socket; which is an asynchronous operation.
So far, I'm doing something like
Observable.Using(
() => CreateClientWebSocket(),
client => Observable.Using(
() => Disposable.FromAction(() => Task.Run(() => client.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None))),
_ => Observable.Create(/* Do the subscription here */)))
Which sort of works, but doesn't feel very clean - in particular as the asynchronous disposal is just fired off to the thread pool, and ignores whatever might have otherwise been specified with .SubscribeOn() or such.
Is there any better way to handle resources which are effectively IAsyncDisposable ?
Given IObservable.Subscribe returns a (non-async)IDisposable, I couldn't think of any direct way. Maybe there's some way of capturing the scheduler, and running the disposal task on there at least though?