4

I'm having many timeouts exceptions using Polly Bulkhead policy, this policy helps me to limit the number of concurrent calls that I'm sending to a specific hosts. However It seems that the HttpClient Timeout time affects the whole delegates.

I'm using IHttpClientFactory to configure that with the following code:

services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

My problem is that I want that the timeout only affects to the request itselft and no to the bulkhead policy, because that behaviour that I want to achieve is the following:

  • Limit the number of concurrent requests to a specific host
  • Wait infinitely until there is capacity to send the requests (when the queque will be full, Polly will raise an exception)
  • Send the request to the host and apply a timeout, for example the default one.

I have achieve that behaviour using a Semaphore instead of a Bulkhead Polly Policy but I'd like to encapsulate that code using a Policy.

Thanks.

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
ganchito55
  • 3,559
  • 4
  • 25
  • 46
  • 1
    Even though you can use [Bulkhead for throttling](https://github.com/App-vNext/Polly/wiki/Bulkhead#configuring-maxparallelization---a-technical-and-software-perspective) it was not particularly designed for that. The bulkhead policy shines whenever it has to deal with CPU bound operations. You should think of this policy as a way to declare a small thread pool for a dedicated work. – Peter Csala Jul 08 '21 at 07:01
  • Do you still want to use bulkhead? – Peter Csala Jul 09 '21 at 15:06
  • @PeterCsala thanks for your answer, do you think that there is another polly policy to achieve request throttling or it should be better to create a custom one with a Semaphore? I don't know if creating a *middleware* in the HttpRequest pipeline is the best option, tbh. – ganchito55 Jul 12 '21 at 07:08
  • 1
    You have a lot of options, just to name a few: 1) DataFlow's ActionBlock + BatchBlock 2) DataFlow's BufferBlock 3) System.Threading.Channels' BoundedChannel 4) HashSet + WhenAny 5) SemaphoreSlim + Async foreach 6) etc. If you want I can create a post to demonstrate these options – Peter Csala Jul 12 '21 at 07:22
  • I know a few of them but I'll be awesome if you could do that, my idea is consumming the IHttpClientFactory with a named HttpClient with the throttling behaviour inside. – ganchito55 Jul 12 '21 at 11:04
  • I've posted some sample codes. I hope you will find them useful. – Peter Csala Jul 12 '21 at 15:50
  • 1
    @PeterCsala thanks let me check them in depth – ganchito55 Jul 12 '21 at 16:05
  • 1
    Thank you for the bounty, I highly appreciate your kindness. Just out of curiosity which solution did you pick? – Peter Csala Jul 16 '21 at 13:45
  • 1
    We are still checking it, but thanks for all your effort – ganchito55 Jul 16 '21 at 14:12

1 Answers1

4

I have put these samples together to demonstrate the different options how can you perform throttling on HttpClient requests. I have to emphasize that these are just examples and are far from production code so please scrutinize them through that glass.

The following sample codes show how to issue requests in a fire and forgot manner (so they do not care about the responses). The solutions assume that there are more requests than the available throughput. In other words the producer is faster than the consumer(s) that's why there is some sort of queueing mechanism to handle this imbalance.

With Back and Action Blocks

public class ThrottlingWithBatchBlock
{
    static readonly HttpClient client = new();
    private readonly BatchBlock<HttpRequestMessage> requests = new(100);
    private ActionBlock<HttpRequestMessage[]> consumer;

    public ThrottlingWithBatchBlock()
    {
        consumer = new(
            reqs => ConsumerAsync(reqs),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
        requests.LinkTo(consumer);
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    private async Task ConsumerAsync(HttpRequestMessage[] requests)
    {
        foreach (var request in requests)
            await client.SendAsync(request).ConfigureAwait(false);
    }
}

With Buffer Block

public class ThrottlingWithBufferBlock
{
    static readonly HttpClient client = new();
    private readonly BufferBlock<HttpRequestMessage> requests = new(
            new DataflowBlockOptions { BoundedCapacity = 100 });

    public ThrottlingWithBufferBlock()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.OutputAvailableAsync())
        {
            var request = await requests.ReceiveAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

With Channels

public class ThrottlingWithChannels
{
    static readonly HttpClient client = new();
    private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
            new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });

    public ThrottlingWithChannels()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.Writer.WaitToWriteAsync();
        await requests.Writer.WriteAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.Reader.WaitToReadAsync())
        {
            var request = await requests.Reader.ReadAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

With Blocking Collection

public class ThrottlingWithBlockingCollection
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithBlockingCollection()
    {
        _ = Enumerable.Range(1, 100)
            .Select(_ => ConsumerAsync()).ToArray();
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }

    async Task ConsumerAsync()
    {
        while (true)
        {
            var request = requests.Take();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

With Parallel Foreach

public class ThrottlingWithParallelForEach
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithParallelForEach()
    {
        _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
    public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
    {
        var toBeProcessedJobs = new HashSet<Task>();
        var remainingJobsEnumerator = source.GetEnumerator();

        void AddNewJob()
        {
            if (remainingJobsEnumerator.MoveNext())
            {
                var readyToProcessJob = body(remainingJobsEnumerator.Current);
                toBeProcessedJobs.Add(readyToProcessJob);
            }
        }

        while (toBeProcessedJobs.Count < degreeOfParallelism)
        {
            AddNewJob();
        }

        while (toBeProcessedJobs.Count > 0)
        {
            Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
            toBeProcessedJobs.Remove(processed);
            AddNewJob();
        }

        return;
    }
}
Peter Csala
  • 17,736
  • 16
  • 35
  • 75