Managing concurrency with the async and await pattern is fairly simple and there are many ways to do it. I give you the worlds most contrived examples using SemaphorSlim
, ActionBlock
and Reactive Extensions
Given some random async IO work load
public static async Task<PingReply> SomethingAsync(IPAddress address)
{
try
{
Interlocked.Increment(ref _counter);
var ping = new Ping();
var result = await ping.SendPingAsync(address, 1000);
Console.WriteLine($"{Interlocked.Read(ref _counter)} : {address} - {result.Status} {result.RoundtripTime}ms");
return result;
}
catch (PingException ex)
{
Console.WriteLine($"{Interlocked.Read(ref _counter)} : {address} - {ex.Message}");
}
finally
{
Interlocked.Decrement(ref _counter);
}
return null;
}
SemaphorSlim
private static readonly SemaphoreSlim _sem = new SemaphoreSlim(3, 3);
public static async Task Test1(IEnumerable<IPAddress> list)
{
static async Task<PingReply> Process(IPAddress address)
{
try
{
await _sem.WaitAsync();
return await SomethingAsync(address);
}
finally
{
_sem.Release();
}
}
await Task.WhenAll(list.Select(Process));
}
ActionBlock from TPL dataflow
public static async Task Test2(IEnumerable<IPAddress> list)
{
var block = new ActionBlock<IPAddress>(SomethingAsync,
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 3
});
await Task.WhenAll(list.Select(x => block.SendAsync(x)));
block.Complete();
await block.Completion;
}
Reactive Extenions
public static async Task Test3(IEnumerable<IPAddress> list)
{
await list
.ToObservable()
.Select(x => Observable.FromAsync(() => SomethingAsync(x)))
.Merge(3)
.ToList();
}
Usage
// a list of something
var list = Enumerable
.Range(0, 20)
.Select(x => RandomIpAddress())
.ToList();
await Test1(list);
Console.WriteLine("---------");
await Test2(list);
// not included in the Online demo,
// as I was having trouble loading the nuget in .net fiddle
Console.WriteLine("---------");
await Test3(list);
Output
3 : 66.65.213.229 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
3 : 143.134.242.155 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 89.90.185.217 - TimedOut 0ms
1 : 197.101.34.54 - TimedOut 0ms
---------
3 : 143.134.242.155 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
3 : 66.65.213.229 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 89.90.185.217 - DestinationHostUnreachable 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 197.101.34.54 - TimedOut 0ms
---------
3 : 66.65.213.229 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
1 : 143.134.242.155 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 89.90.185.217 - DestinationHostUnreachable 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 197.101.34.54 - TimedOut 0ms
Full Demo here