0

I am working on a protocol and trying to use as much async/await as I can to make it scale well. The protocol will have to support hundreds to thousands of simultaneous connections. Below is a little bit of pseudo code to illustrate my problem.

private static async void DoSomeWork()
{
    var protocol = new FooProtocol();
    await protocol.Connect("127.0.0.1", 1234);
    var i = 0;
    while(i != int.MaxValue)
    {
        i++;
        var request = new FooRequest();
        request.Payload = "Request Nr " + i;
        var task = protocol.Send(request);
        _ = task.ContinueWith(async tmp =>
        {
            var resp = await task;
            Console.WriteLine($"Request {resp.SequenceNr} Successful: {(resp.Status == 0)}");
         });

    }
}

And below is a little pseudo code for the protocol.

public class FooProtocol
{
    private int sequenceNr = 0;
    
    private SemaphoreSlim ss = new SemaphoreSlim(20, 20);
    
    public Task<FooResponse> Send(FooRequest fooRequest)
    {
        var tcs = new TaskCompletionSource<FooResponse>();
        ss.Wait();
        var tmp = Interlocked.Increment(ref sequenceNr);
        fooRequest.SequenceNr = tmp;
        // Faking some arbitrary delay. This work is done over sockets. 
        Task.Run(async () =>
        {
            await Task.Delay(1000);
            tcs.SetResult(new FooResponse() {SequenceNr = tmp});
            ss.Release();
        });
        return tcs.Task;

    }
}

I have a protocol with request and response pairs. I have used asynchronous socket programming. The FooProtocol will take care of matching up request with responses (sequence numbers) and will also take care of the maximum number of pending requests. (Done in the pseudo and my code with a semaphore slim, So I am not worried about run away requests). The DoSomeWork method calls the Protocol.Send method, but I don't want to await the response, I want to spin around and send the next one until I am blocked by the maximum number of pending requests. When the task does complete I want to check the response and maybe do some work.

I would like to fix two things

  1. I would like to avoid using Task.ContinueWith() because it seems to not fit in cleanly with the async/await patterns
  2. Because I have awaited on the connection, I have had to use the async modifier. Now I get warnings from the IDE "Because this call is not waited, execution of the current method continues before this call is complete. Consider applying the 'await' operator to the result of the call." I don't want to do that, because as soon as I do it ruins the protocol's ability to have many requests in flight. The only way I can get rid of the warning is to use a discard. Which isn't the worst thing but I can't help but feel like I am missing a trick and fighting this too hard.
uriDium
  • 13,110
  • 20
  • 78
  • 138
  • You could delegate the job to the thread pool and forget about it -> `_ = Task.Run(async () => { var resp = await protocol.Send(request); Console.WriteLine($"Request {resp.SequenceNr} Successful: {(resp.Status == 0)}"); }` – Camilo Terevinto Dec 03 '20 at 19:23
  • @CamiloTerevinto Sounds like with hundreds to thousands simultaneous requests, you will starve the thread pool. Besides it kind of defeats the purpose of async. – GSerg Dec 03 '20 at 19:24
  • @GSerg Yeah, I was afraid that'd be the case. Perhaps just push sending the request and getting the response to an `async void` method? – Camilo Terevinto Dec 03 '20 at 19:25
  • Factor out `await protocol.Send(request); Console.WriteLine` in a separate `Func`, keep creating them and adding them to a list, and then in the end do a `Task.WhenAll` on them? Then again, you will probably want to implement [throttling](https://stackoverflow.com/q/22492383/11683) of some sort. – GSerg Dec 03 '20 at 19:26
  • @GSerg Throttling is built into the protocol. I left it out the pseudo code. I can't keep a list of all the tasks because these connections are long lived, potentially for the life time of the service. – uriDium Dec 03 '20 at 19:56

2 Answers2

1

Side note: I hope your actual code is using SemaphoreSlim.WaitAsync rather than SemaphoreSlim.Wait.

In most socket code, you do end up with a list of connections, and along with each connection is a "processor" of some kind. In the async world, this is naturally represented as a Task.

So you will need to keep a list of Tasks; at the very least, your consuming application will need to know when it is safe to shut down (i.e., all responses have been received).

Don't preemptively worry about using Task.Run; as long as you aren't blocking (e.g., SemaphoreSlim.Wait), you probably will not starve the thread pool. Remember that during the awaits, no thread pool thread is used.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Yes it is. I was writing the code down from memory. I do have a dictionary of connections so that I can lookup the connection a request is destined for. WRT to the list of tasks, this is a long running service and keeping a list, it will get massive. I did think of some sort of data structure that I can routinely clean out the completed tasks. What would you recommend? – uriDium Dec 04 '20 at 07:32
  • I did notice that with my SemaphoreSlim WaitAsyc, trying to do an await ss.WaitAsync() then says that I have to make the Send method return Task>. The only way I can get rid of this is to wrap it in another class or method and then use a method public async void which is very taboo. In the protocol class itself I am using a dictionary of TaskCompletionSource. Handing back the task and then listening on the socket asynchronously in another method, I lookup the TCS and set the result. Is there any other way to do this? – uriDium Dec 04 '20 at 09:58
  • @uriDium: The tasks should notify the "connection manager" to remove them as soon as the connection is no longer viable, so you shouldn't have any tasks for inactive collections. Regarding async `Send`, you should be able to do this just with `async`/`await`/`Task`. There's a `Task` that you control from the TCS, which you can `await` on after the `WaitAsync`. – Stephen Cleary Dec 04 '20 at 14:07
1

I am not sure that it's a good idea to enforce the maximum concurrency at the protocol level. It seems to me that this responsibility belongs to the caller of the protocol. So I would remove the SemaphoreSlim, and let it do the one thing that it knows to do well:

public class FooProtocol
{
    private int sequenceNr = 0;

    public async Task<FooResponse> Send(FooRequest fooRequest)
    {
        var tmp = Interlocked.Increment(ref sequenceNr);
        fooRequest.SequenceNr = tmp;
        await Task.Delay(1000); // Faking some arbitrary delay
        return new FooResponse() { SequenceNr = tmp };
    }
}

Then I would use an ActionBlock from the TPL Dataflow library in order to coordinate the process of sending a massive number of requests through the protocol, by handling the concurrency, the backpreasure (BoundedCapacity), the cancellation (if needed), the error-handling, and the status of the whole operation (running, completed, failed etc). Example:

private static async Task DoSomeWorkAsync()
{
    var protocol = new FooProtocol();

    var actionBlock = new ActionBlock<FooRequest>(async request =>
    {
        var resp = await protocol.Send(request);
        Console.WriteLine($"Request {resp.SequenceNr} Status: {resp.Status}");
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 20,
        BoundedCapacity = 100
    });

    await protocol.Connect("127.0.0.1", 1234);

    foreach (var i in Enumerable.Range(0, Int32.MaxValue))
    {
        var request = new FooRequest();
        request.Payload = "Request Nr " + i;
        var accepted = await actionBlock.SendAsync(request);
        if (!accepted) break; // The block has failed irrecoverably
    }
    actionBlock.Complete();
    await actionBlock.Completion; // Propagate any exceptions
}

The BoundedCapacity = 100 configuration means that the ActionBlock will store in its internal buffer at most 100 requests. When this threshold is reached, anyone who wants to send more requests to it will have to wait. The awaiting will happen in the await actionBlock.SendAsync line.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104