-1

I've got this collection of custom object in my app, each one being able to update itself through a call to some async method, which basically scans a web page to intercept updated data on the web page itself and stores it in a sqlite db.

Or at least they will be able to auto-update in the near future (things are already working in 'manual' mode), because the objects needing an update at the same time could be hundreds.

I was thinking about some "control center" class queeing the call exceding a certain number of allowed concurrent call (let's say 3), but i don't have the fanciest idea on where to start with. So can anyone point me to any working example (or even a template, it will do!) to start studying?

TheGeneral
  • 79,002
  • 9
  • 103
  • 141
Rokuros1080
  • 227
  • 1
  • 3
  • 11

2 Answers2

2

Managing concurrency with the async and await pattern is fairly simple and there are many ways to do it. I give you the worlds most contrived examples using SemaphorSlim, ActionBlock and Reactive Extensions

Given some random async IO work load

public static async Task<PingReply> SomethingAsync(IPAddress address)
{ 
   try
   {
      Interlocked.Increment(ref _counter);
      var ping = new Ping();
      var result = await ping.SendPingAsync(address, 1000);
      Console.WriteLine($"{Interlocked.Read(ref _counter)} : {address} - {result.Status} {result.RoundtripTime}ms");
      return result;
   }
   catch (PingException ex)
   {
      Console.WriteLine($"{Interlocked.Read(ref _counter)} : {address} - {ex.Message}");
   }
   finally
   {
      Interlocked.Decrement(ref _counter);
   }
   return null;
}

SemaphorSlim

private static readonly SemaphoreSlim _sem = new SemaphoreSlim(3, 3);

public static async Task Test1(IEnumerable<IPAddress> list)
{
   static async Task<PingReply> Process(IPAddress address)
   {
      try
      {
         await _sem.WaitAsync();
         return await SomethingAsync(address);
      }
      finally
      {
         _sem.Release();
      }
   }
   
   await Task.WhenAll(list.Select(Process));
}

ActionBlock from TPL dataflow

public static async Task Test2(IEnumerable<IPAddress> list)
{
   var block = new ActionBlock<IPAddress>(SomethingAsync,
      new ExecutionDataflowBlockOptions()
      {
         MaxDegreeOfParallelism = 3
      });

   await Task.WhenAll(list.Select(x => block.SendAsync(x)));

   block.Complete();
   await block.Completion;
}

Reactive Extenions

public static async Task Test3(IEnumerable<IPAddress> list)
{
   await list
      .ToObservable()
      .Select(x => Observable.FromAsync(() => SomethingAsync(x)))
      .Merge(3)
      .ToList();
}

Usage

// a list of something
var list = Enumerable
   .Range(0, 20)
   .Select(x => RandomIpAddress())
   .ToList();

await Test1(list);

Console.WriteLine("---------");

await Test2(list);

// not included in the Online demo, 
// as I was having trouble loading the nuget in .net fiddle

Console.WriteLine("---------");

await Test3(list);

Output

3 : 66.65.213.229 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
3 : 143.134.242.155 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 89.90.185.217 - TimedOut 0ms
1 : 197.101.34.54 - TimedOut 0ms
---------
3 : 143.134.242.155 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
3 : 66.65.213.229 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 89.90.185.217 - DestinationHostUnreachable 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 197.101.34.54 - TimedOut 0ms
---------
3 : 66.65.213.229 - TimedOut 0ms
3 : 26.99.52.159 - TimedOut 0ms
1 : 143.134.242.155 - TimedOut 0ms
3 : 110.29.95.203 - DestinationHostUnreachable 0ms
3 : 66.249.81.37 - Success 309ms
3 : 163.176.88.190 - TimedOut 0ms
3 : 174.59.246.66 - TimedOut 0ms
3 : 83.83.27.11 - TimedOut 0ms
3 : 89.90.185.217 - DestinationHostUnreachable 0ms
3 : 228.198.82.154 - TimedOut 0ms
3 : 19.156.14.248 - TimedOut 0ms
3 : 197.101.34.54 - TimedOut 0ms

Full Demo here

TheGeneral
  • 79,002
  • 9
  • 103
  • 141
0

I would start with googling for "concurrent async queue c#". Among others, there is a SO question awaitable Task based queue. Although it is not concurrent, you can either have a three queues or modify the queue from the question. Also you can follow the recommendation from this question's answer and try to achieve it using DataFlow library.

Alex Netkachov
  • 13,172
  • 6
  • 53
  • 85