So I created an observable wrapper for Stephen Cleary's AsyncProducerConsumerQueue<T>
with the following code.
I'm wondering if anyone here knows how I could have done this in a much simpler way?
- Could it have been written without a wrapper class?
- Would it be possible to prevent errors from multiple wrappers being applied to one queue?
- Could I make it connect on the first subscription instead of via a direct call to
Connect
? If so, what are the implications of that? - Finally, how would you have done it?
using Nito.AsyncEx;
using System.Reactive;
static async Task ExampleUsage() {
var queue = new AsyncProducerConsumerQueue<int>();
var observable = queue.AsConnectableObservable();
await queue.EnqueueAsync(1);
observable.Subscribe(Console.WriteLine);
observable.Connect();
await queue.EnqueueAsync(2);
}
public static class AsyncExExtensions {
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
}
}
class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {
readonly AsyncProducerConsumerQueue<T> Queue;
long _isConnected = 0;
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;
public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
Queue = queue;
}
public IDisposable Connect() {
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
var cts = new CancellationTokenSource();
var token = cts.Token;
Task.Run(async () => {
try {
while (true) {
token.ThrowIfCancellationRequested();
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
foreach (var observer in Observers)
observer.OnNext(@event);
}
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
foreach (var observer in Observers)
observer.OnCompleted();
}
});
return Disposable.Create(() => {
cts.Cancel();
cts.Dispose();
});
}
readonly object subscriberListMutex = new object();
public IDisposable Subscribe(IObserver<T> observer) {
lock (subscriberListMutex) {
Observers = Observers.Add(observer);
}
return Disposable.Create(() => {
lock (subscriberListMutex) {
Observers = Observers.Remove(observer);
}
});
}
}