I have read a number of articles and questions here on StackOverflow about wrapping a callback based API with a Task
based one using a TaskCompletionSource
, and I'm trying to use that sort of technique when communicating with a Solace PubSub+ message broker.
My initial observation was that this technique seems to shift responsibility for concurrency. For example, the Solace broker library has a Send()
method which can possibly block, and then we get a callback after the network communication is complete to indicate "real" success or failure. So this Send()
method can be called very quickly, and the vendor library limits concurrency internally.
When you put a Task around that it seems you either serialize the operations ( foreach message await SendWrapperAsync(message)
), or take over responsibility for concurrency yourself by deciding how many tasks to start (eg, using TPL dataflow).
In any case, I decided to wrap the Send
call with a guarantor that will retry forever until the callback indicates success, as well as take responsibility for concurrency. This is a "guaranteed" messaging system. Failure is not an option. This requires that the guarantor can apply backpressure, but that's not really in the scope of this question. I have a couple of comments about it in my example code below.
What it does mean is that my hot path, which wraps the send + callback, is "extra hot" because of the retry logic. And so there's a lot of TaskCompletionSource
creation here.
The vendor's own documentation makes recommendations about reusing their Message
objects where possible rather then recreating them for every Send
. I have decided to use a Channel
as a ring buffer for this. But that made me wonder - is there some alternative to the TaskCompletionSource
approach - maybe some other object that can also be cached in the ring buffer and reused, achieving the same outcome?
I realise this is probably an overzealous attempt at micro-optimisation, and to be honest I am exploring several aspects of C# which are above my pay grade (I'm a SQL guy, really), so I could be missing something obvious. If the answer is "you don't actually need this optimisation", that's not going to put my mind at ease. If the answer is "that's really the only sensible way", my curiosity would be satisfied.
Here is a fully functioning console application which simulates the behaviour of the Solace library in the MockBroker
object, and my attempt to wrap it. My hot path is the SendOneAsync
method in the Guarantor
class. The code is probably a bit too long for SO, but it is as minimal a demo I could create that captures all of the important elements.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
internal class Message { public bool sent; public int payload; public object correlator; }
// simulate third party library behaviour
internal class MockBroker
{
public bool TrySend(Message m, Action<Message> callback)
{
if (r.NextDouble() < 0.5) return false; // simulate chance of immediate failure / "would block" response
Task.Run(() => { Thread.Sleep(100); m.sent = r.NextDouble() < 0.5; callback(m); }); // simulate network call
return true;
}
private Random r = new();
}
// Turns MockBroker into a "guaranteed" sender with an async concurrency limit
internal class Guarantor
{
public Guarantor(int maxConcurrency)
{
_broker = new MockBroker();
// avoid message allocations in SendOneAsync
_ringBuffer = Channel.CreateBounded<Message>(maxConcurrency);
for (int i = 0; i < maxConcurrency; i++) _ringBuffer.Writer.TryWrite(new Message());
}
// real code pushing into a T.T.T.DataFlow block with bounded capacity and parallelism
// execution options both equal to maxConcurrency here, providing concurrency and backpressure
public async Task Post(int payload) => await SendOneAsync(payload);
private async Task SendOneAsync(int payload)
{
Message msg = await _ringBuffer.Reader.ReadAsync();
msg.payload = payload;
// send must eventually succeed
while (true)
{
// *** can this allocation be avoided? ***
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
msg.correlator = tcs;
// class method in real code, inlined here to make the logic more apparent
Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.sent);
if (_broker.TrySend(msg, callback) && await tcs.Task) break;
else
{
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
}
// real code raising an event here to indicate successful delivery
await _ringBuffer.Writer.WriteAsync(msg);
Console.WriteLine(payload);
}
private Channel<Message> _ringBuffer;
private MockBroker _broker;
}
internal class Program
{
private static async Task Main(string[] args)
{
// at most 10 concurrent sends
Guarantor g = new(10);
// hacky simulation since in this demo there's nothing generating continuous events,
// no DataFlowBlock providing concurrency (it will be limited by the Channel instead),
// and nobody to notify when messages are successfully sent
List<Task> sends = new(100);
for (int i = 0; i < 100; i++) sends.Add(g.Post(i));
await Task.WhenAll(sends);
}
}