2

I am runnning a public front facing api that hardly scale for now. This API is built upon Asp.Net web API 2 (.Net 4.7.2). This

The problem that I have is that from this entry point the service have to make multiple calls to others internal services exposing their Rest interface. We have made some optimisation :

  • to make all those call asynchronous (async/await).
  • All the services (public facing one as well as internal) are load balanced and we have put in place 4 servers with hight memory ram/cpu (64 gb, 8cpu each)

But when we have sudden burst of load , or when we make some stress tests we see that we have a pain to scale up : the response time start to increase and we could not achieve more than 150 req/s and average of response is 2.5 sec while all the time seem to been spend in the network latency wainting each internal service to respond.... So I was thinking if it was possible to buffer a bunch of requests and make a batch call to the internal apis to get the details to combine and then answer the callers.

My idea would be to have a special kind of static httpClient with an async method that would bufferize the calls and will make a request when either there is a special count of calls bufferized or when a limit of few ms have elapsed : that way when we are under load our API could make few network calls and be more responsive... I know that somes are also using a mom/bus for that like Kafka for example , but it seems to me that going this way will only let us have eaven more paralell calls to handle but no real gain on the speed.. (I may be wrong on that)

To illustrate what I have in mind : enter image description here

Do you think that this could be done using Reactive (for observe either the delay spent, or the count of messages bufferized)/TPL Dataflow (in order to fill a block and then make a batch call) ? I have this idea in mind but I do not know if it is a good idea, and how to make it work...

UPDATE : Find here the usefull sample code provided by Theodor Zoulias :

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    public static async Task Main()
    {
        var execution = new BatchExecution<int, int>(async (int[] inputs) =>
        {
            Print($"Processing [{String.Join(", ", inputs)}]");
            await Task.Yield() ;
            return inputs.Select(x => x * 10).ToArray();
        }, batchSize: 3);

        Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
        {
            //await Task.Delay(id * 50);
            Print($"Before InvokeAsync({id})");
            var result = await execution.InvokeAsync(id);
            Print($"After await InvokeAsync({id}), result is {result}");
        })).ToArray();

        await Task.WhenAll(workers);
    }

    static void Print(string line)
    {
        Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
            .ManagedThreadId}] > {line}");
    }

    public class BatchExecution<TInput, TOutput>
    {
        private class AsyncOperation : TaskCompletionSource<TOutput>
        {
            public AsyncOperation() :
                base(TaskCreationOptions.RunContinuationsAsynchronously)
            { }
            public TInput Input { get; init; }
        }

        private readonly BatchBlock<AsyncOperation> _batchBlock;
        private readonly ActionBlock<AsyncOperation[]> _actionBlock;

        public BatchExecution(
            Func<TInput[], Task<TOutput[]>> batchAction,
            int batchSize,
            int maximumConcurrency = DataflowBlockOptions.Unbounded)
        {
            _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
            _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
            {
                try
                {
                    TInput[] inputs = operations.Select(x => x.Input).ToArray();
                    TOutput[] results = await batchAction(inputs);
                    if (results.Length != inputs.Length)
                        throw new InvalidOperationException("Results count mismatch.");
                    for (int i = 0; i < operations.Length; i++)
                        operations[i].SetResult(results[i]);
                }
                catch (OperationCanceledException oce)
                {
                    Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
                }
                catch (Exception ex)
                {
                    Array.ForEach(operations, x => x.TrySetException(ex));
                }
            }, new() { MaxDegreeOfParallelism = maximumConcurrency });
            _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
        }

        public Task<TOutput> InvokeAsync(TInput input)
        {
            var operation = new AsyncOperation() { Input = input };
            bool accepted = _batchBlock.Post(operation);
            if (!accepted) throw new InvalidOperationException(
                "The component has been marked as complete.");
            return operation.Task;
        }

        public void Complete() => _batchBlock.Complete();
        public Task Completion => _actionBlock.Completion;
    }
}

I need some feedback/advice on the way I am doing that : is that possible to do what I am after with Reactive/TPL and a httpClient or is there a better way to do it ?

Dypso
  • 563
  • 1
  • 5
  • 15
  • I can't comment on whether what you are trying to do is sane, but for a technical TPL Dataflow manifestation you can look here: [Batching on duration or threshold using TPL Dataflow](https://stackoverflow.com/questions/52633346/batching-on-duration-or-threshold-using-tpl-dataflow) – Theodor Zoulias Nov 16 '21 at 16:33
  • This is from where I get the idea for the IPropagatorBlock.... But I do not know how to wait for let's says 10 urls to be enqueued before making a request and get the responses back one by one and respond to the caller in async work... – Dypso Nov 16 '21 at 22:34
  • You could pass cold `Task`s as messages to the `BatchBlock`, and link it to an `ActionBlock` that will run the tasks by invoking their `RunSynchronously` method. You can see an example [here](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy/62882637#62882637). If the actions that you want to invoke are asynchronous, you will need nested tasks (`Task`). – Theodor Zoulias Nov 16 '21 at 22:47
  • Actually the above idea and the mentioned example are probably quite far from what you are trying to achieve. It might help if you could provide a toy/minimal example of the desirable functionality, so that we have something more tangible to comment on. – Theodor Zoulias Nov 16 '21 at 22:52
  • 1
    Thank you for your feedback : I am trying to make another more advanced toy sample... And yes I want the caller to use it asynchrnously just as with a pure httpclient. In the main time I have found an example quite similar to what I want to do : https://stackoverflow.com/questions/50120304/request-response-pattern-with-tpl-dataflow?rq=1 . The difference with me is I do want to group all the id to request on and make a single call in batch mode whereas the example above is for throttle the calls to the same service... – Dypso Nov 17 '21 at 09:52

1 Answers1

1

Here is a BatchExecution class, that accepts individual requests, and invokes a batch operation when the number of stored requests reaches a specified number (batchSize). The results of the batch operation are propagated to the associated individual requests:

public class BatchExecution<TInput, TOutput>
{
    private class AsyncOperation : TaskCompletionSource<TOutput>
    {
        public AsyncOperation() :
            base(TaskCreationOptions.RunContinuationsAsynchronously) { }
        public TInput Input { get; init; }
    }

    private readonly BatchBlock<AsyncOperation> _batchBlock;
    private readonly ActionBlock<AsyncOperation[]> _actionBlock;

    public BatchExecution(
        Func<TInput[], Task<TOutput[]>> batchAction,
        int batchSize,
        int maximumConcurrency = DataflowBlockOptions.Unbounded)
    {
        _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
        _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
        {
            try
            {
                TInput[] inputs = operations.Select(x => x.Input).ToArray();
                TOutput[] results = await batchAction(inputs);
                if (results.Length != inputs.Length)
                    throw new InvalidOperationException("Results count mismatch.");
                for (int i = 0; i < operations.Length; i++)
                    operations[i].SetResult(results[i]);
            }
            catch (OperationCanceledException oce)
            {
                Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
            }
            catch (Exception ex)
            {
                Array.ForEach(operations, x => x.TrySetException(ex));
            }
        }, new() { MaxDegreeOfParallelism = maximumConcurrency });
        _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
    }

    public Task<TOutput> InvokeAsync(TInput input)
    {
        var operation = new AsyncOperation() { Input = input };
        bool accepted = _batchBlock.Post(operation);
        if (!accepted) throw new InvalidOperationException(
            "The component has been marked as complete.");
        return operation.Task;
    }

    public void Complete() => _batchBlock.Complete();
    public Task Completion => _actionBlock.Completion;
}

Usage example. Let's assume the existence of this internal service API:

Task<string[]> GetCityNamesAsync(int[] ids);

The BatchExecution could then be initialized and used like this:

var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
{
    return await GetCityNamesAsync(ids);
}, batchSize: 10);

//...
string cityName = await batchExecution.InvokeAsync(13);

You could consider customizing the class by replacing the standard BatchBlock<AsyncOperation> with a custom time-aware BatchBlock, like the one found in this question.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks a lot. But your version is working only when a batchsize =1, as soon as I test with a bathsize of more than one item I have like a deadlock : the script keep awaiting at the first call (await batchExecution.InvokeAsync(xx). I have tryied the sample in a console application so may be it's a kind of deadlock because the batchblock is waiting for next element before completion and the main program is waiting for the batch block ...I am not sure, but still trying to make it work. – Dypso Nov 18 '21 at 19:43
  • @Dypso the implementation above is based on the standard `BatchBlock`, which is not time-aware, so an individual isolated `InvokeAsync` will never complete. That's because a batch containing `batchSize` operations will never be created. You need to test it with multiple workers (more workers than `batchSize`), or you must replace the standard `BatchBlock` with a custom time-aware one, in the implementation of the `BatchExecution` class. – Theodor Zoulias Nov 18 '21 at 21:03
  • Sorry what do you mean by "workers" ? I have made tests where I have at least more InvokeAsync than the batchSize , but it doesn't work and seem to lock on the first invokeAsync call. I have also replaced the standard BatchBlock by the time aware one : in this case I have at least an execution every time the timer trigger it ... I will update the code in my question with lastest source code so you can see what I have done. – Dypso Nov 18 '21 at 22:28
  • 1
    @Dypso check out [this](https://dotnetfiddle.net/21ACfV) fiddle demo. By workers I mean asynchronous methods that are running concurrently to each other. Sometimes they are also called "asynchronous workflows". – Theodor Zoulias Nov 18 '21 at 23:00