I am working with a third party library which acts as an interface to a pub-sub message broker. The broker is Solace PubSub+.
For subscribers, the vendor library employs a "push messages via callback" pattern.
I am writing a my own wrapper library around the vendor library to make it easier for other devs to work with (hiding all of the internals of how the library communicates with the network and so on).
In that same vein, I think it might be helpful to to expose a subscriber feed as an IAsyncEnumerable
, and I think this might be a good use case for System.Threading.Channels
. I have two concerns:
- Are channels appropriate here, or am I overengineering this? Ie, Is there a more "C# idiomatic" way to wrap callbacks?
- Is my
EnumerableBroker
wrapper implementation safe, or have I fallen into an async trap somewhere?
I realise the first question might be a better fit for CodeReview than SO, but since the answer to that also ties in with the second concern, it seems appropriate to put them together. Worth noting: I am avoiding IObservable
/ Rx, since my goal is to make my interface more basic than the vendor's, not to require that the other devs and myself learn Rx! Understanding how the producer and consumer processes are independent is also trivial with the channel in the middle, whereas with an observable my first mental process is "Ok, so are the producer and the consumer still independent? At first glance it looks like I have to learn about schedulers now... gosh, how about I just use an await foreach
?"
Here's a minimal mockup of consuming messages without the EnumerableBroker
:
// mockup of third party class
private class Broker
{
// mockup of how the third party library pushes messages via callback
public void Subscribe(EventHandler<int> handler) => this.handler = handler;
//simulate the broker pushing messages. Not "real" code
public void Start()
{
Task.Run
(
() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested; i++)
{
// simulate internal latency
Thread.Sleep(10);
handler?.Invoke(this, i);
}
}, cts.Token
);
}
public void Stop() => cts.Cancel();
private CancellationTokenSource cts = new();
private EventHandler<int> handler;
}
private static async Task Main()
{
var broker = new Broker();
broker.Subscribe((_, msg) => Console.WriteLine(msg));
broker.Start();
await Task.Delay(1000);
broker.Stop();
}
And now with a minimal reproduction of the EnumerableBroker
(still using the same mock Broker
class listed above). At least one benefit here seems to be that if the subscriber needs to do a lot of work to process a message, it doesn't tie up the broker's thread - at least until the buffer fills up. This seems to work without error, but I've learned to be wary of my limited grasp of async.
private class EnumerableBroker
{
public EnumerableBroker(int bufferSize = 8)
{
buffer = Channel.CreateBounded<int>
(
new BoundedChannelOptions(bufferSize) { SingleReader = true,
SingleWriter = true }
);
}
public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
// switched to sync per Theodor's comments
(_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
);
ct.Register(broker.Stop);
broker.Start();
return buffer.Reader.ReadAllAsync(ct);
}
private readonly Channel<int> buffer;
private readonly Broker broker = new();
}
private static async Task Main()
{
var cts = new CancellationTokenSource();
var broker = new EnumerableBroker();
cts.CancelAfter(1000);
try
{
await foreach (var msg in broker.ReadAsync(cts.Token))
{
Console.WriteLine(msg);
}
}
catch (OperationCanceledException) { }
}