I have put these samples together to demonstrate the different options how can you perform throttling on HttpClient
requests. I have to emphasize that these are just examples and are far from production code so please scrutinize them through that glass.
The following sample codes show how to issue requests in a fire and forgot
manner (so they do not care about the responses). The solutions assume that there are more requests than the available throughput. In other words the producer is faster than the consumer(s) that's why there is some sort of queueing mechanism to handle this imbalance.
With Back and Action Blocks
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}
With Buffer Block
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
With Channels
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
With Blocking Collection
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
With Parallel Foreach
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}