1

I am working with a third party library which acts as an interface to a pub-sub message broker. The broker is Solace PubSub+.

For subscribers, the vendor library employs a "push messages via callback" pattern.

I am writing a my own wrapper library around the vendor library to make it easier for other devs to work with (hiding all of the internals of how the library communicates with the network and so on).

In that same vein, I think it might be helpful to to expose a subscriber feed as an IAsyncEnumerable, and I think this might be a good use case for System.Threading.Channels. I have two concerns:

  1. Are channels appropriate here, or am I overengineering this? Ie, Is there a more "C# idiomatic" way to wrap callbacks?
  2. Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?

I realise the first question might be a better fit for CodeReview than SO, but since the answer to that also ties in with the second concern, it seems appropriate to put them together. Worth noting: I am avoiding IObservable / Rx, since my goal is to make my interface more basic than the vendor's, not to require that the other devs and myself learn Rx! Understanding how the producer and consumer processes are independent is also trivial with the channel in the middle, whereas with an observable my first mental process is "Ok, so are the producer and the consumer still independent? At first glance it looks like I have to learn about schedulers now... gosh, how about I just use an await foreach?"

Here's a minimal mockup of consuming messages without the EnumerableBroker:

// mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i++)
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}

And now with a minimal reproduction of the EnumerableBroker (still using the same mock Broker class listed above). At least one benefit here seems to be that if the subscriber needs to do a lot of work to process a message, it doesn't tie up the broker's thread - at least until the buffer fills up. This seems to work without error, but I've learned to be wary of my limited grasp of async.

private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}
allmhuran
  • 4,154
  • 1
  • 8
  • 27
  • 1
    Just adding a comment here regarding the high-level concepts... The natural abstraction for push-based data *is* an observable. `IAsyncEnumerable` is for asynchronous pull-based data. This is why the channel is necessary: you need a buffer to translate push-based to pull-based. That is also why you need to decide (or allow your end users to configure) the buffering strategy: number of items and how to behave when the buffer is full. – Stephen Cleary Sep 04 '21 at 18:23
  • @StephenCleary Understood and agreed, the observable does seem like an "obvious" conceptual mapping, but I was put off by the "apparent" complexity. I could be wrong about the actual complexity, but using `IObservable` seems to force consumers to work in terms of `IObserver`s, which is a whole new abstraction to learn, whereas `IAsyncEnumerable` only requires that they can write an `await foreach`. – allmhuran Sep 04 '21 at 18:35
  • @allmhuran this is overengineered only in the sense that using Channels doesn't need so much code. A typical pattern is to use just *methods* that return a `ChannelReader`, with the method itself creating and owning the channel. – Panagiotis Kanavos Sep 22 '21 at 15:50

2 Answers2

4

Am I overengineering this?

No. A Channel is exactly the kind of component you need in order to implement this functionality. It's a quite simple mechanism. It's basically an async version of the BlockingCollection<T> class, with some extra features (like the Completion property), and a fancy API (the Reader and Writer facades).

Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?

Yes, there is a trap, and you have fallen to it. The SingleWriter = true configuration means that at most one WriteAsync operation is allowed to be concurrently in-flight. Before issuing the next WriteAsync, the previous must be completed. By subscribing to the broker with an async void delegate, you are creating essentially a separate writer (producer) for each message pushed by the broker. Most probably the component will complain about this misuse by throwing InvalidOperationExceptions or something. The solution is not to switch to SingleWriter = false though. This will just circumvent the bounded capacity of the Channel, by creating an external -and highly inefficient- queue with messages that don't fit in the internal queue of the Channel. The solution is to rethink your buffering strategy. If you can't afford to buffer an unlimited number of messages, you must either drop messages, or throw an exception and kill the consumer. Instead of await buffer.Writer.WriteAsync, it's better to feed the channel synchronously with bool accepted = buffer.Writer.TryWrite, and take an appropriate action in case the accepted is false.

Another consideration that you should have in mind is that the ChannelReader.ReadAllAsync method is consuming. This means that if you have multiple readers/consumers of the same channel, each message will be delivered to only one of the consumers. In other words each consumer will receive a partial subset of the channel's messages. You should communicate this to your coworkers, because it's quite trivial to enumerate the same IAsyncEnumerable<T> more than once. After all an IAsyncEnumerable<T> is nothing more than a factory of IAsyncEnumerator<T>s.

Finally, instead of controlling the lifetime of each subscription by a CancellationToken, you can make your coworkers' lives easier by just terminating a subscription automatically when the enumeration of an IAsyncEnumerator<T> terminates. When an await foreach loop ends in any way (like by break or by an exception), the associated IAsyncEnumerator<T> is automatically disposed. The C# language has cleverly hooked the DisposeAsync invocation with the finally block of the iterator, if a try/finally block wraps the yielding loop. You could take advantage of this great feature like this:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • What a fantastic answer. Very much appreciated. I do have one concern though: I do in fact need back-pressure on the broker, because this is a "guaranteed" message pipeline. In other words, I actually want the broker library to stop getting messages from the network... I just want to limit that to "bursty" periods, and not make it typical for every message. So, dropping messages is not an option. Is there a better way to let the channel apply backpressure? `WaitToWriteAsync` perhaps? Or just add a `.Wait()`? – allmhuran Sep 04 '21 at 15:31
  • Thaks @allmhuran. I don't think that either the `WaitToWriteAsync` or the `Wait` can solve this problem. Is it possible to unsubscribe from the broker when the number of buffered messages exceeds some limit, and resubscribe later? – Theodor Zoulias Sep 04 '21 at 15:39
  • It is possible, but annoyingly inefficient. The actual broker platform (on the network) has a large storage buffer to enable it to guarantee delivery and smooth out burstiness. The use case is a continuous service though, there's no expected "end" of the message stream, as it represents a real-time integration component between enterprise apps. This also means the rate of message production is totally unpredictable, and I would rather let the vendor library's callback thread "hang" while the local buffer is drained than disconnect and reconnect, since that may happen frequently. – allmhuran Sep 04 '21 at 15:43
  • 1
    @allmhuran in this case you could experiment with the synchronous `Wait`: `buffer.Writer.WriteAsync(msg).AsTask().Wait();` It may work if the broker produces the messages from a single thread. It won't work if the broker offloads the invocation of the `Subscribe` delegate to the `ThreadPool`, or if it maintains its own unlimited buffer internally. – Theodor Zoulias Sep 04 '21 at 15:49
  • 1
    Roger that, I was about to comment on my edit where I added the `AsTask().Wait()`. I can configure the library's own internal buffer size, so that part is covered. I don't know how they handle their threads, but I will follow up on that. Thank you again, you wonderful person. – allmhuran Sep 04 '21 at 15:54
1

This is overengineered only in the sense that using Channels doesn't need so much code. A typical pattern is to use just methods that accept a ChannelReader as input and return a ChannelReader as output, with the method itself creating and owning the output channel. This makes composing stages into a pipeline very easy, especially if those methods are extension methods.

In this case, your code could be rewritten as :

static ChannelReader<int> ToChannel(this Broker broker, 
    int limit,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(limit);
    var writer=channel.Writer;

    broker.Subscribe((_, args) =>{
        writer.TryWrite(args, token);
    });
    token.Register(()=>writer.Complete());

    return channel;
}

This will lose any messages past the limit. If your Broker understands Tasks, you could use:

broker.Subscribe(async (_, args) =>{
        await writer.WriteAsync(args, token);
    });

If it doesn't understand tasks, and you can't afford to lose anything, perhaps a better solution would be to use an unbounded channel and handle pause/resume in a later stage. You've already asked a similar question.

Otherwise, you'll have to block the callback:

broker.Subscribe(async (_, args) =>{
       writer.WriteAsync(args, token).AsTask().Wait();
    });

That's not an ideal solution though.

In both cases, you can consume the data produced by the reader:

var token=cts.Token;
var reader=broker.ToChannel(10,token);

await foreach(var item in reader.ReadAllAsync(token))
{
...
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Indeed, the followup question came about after I got confirmation from the vendor that this approach would not be safe to use with their library, hence the need to actually make efficient use of their pause and resume functionality. The shared DNA in your answers here and there is clear. – allmhuran Sep 22 '21 at 16:23