0

I have a classic producer consumer problem where multiple users can simultaneously POST data to a web API method (api/test), which triggers IO intensive long running operations asynchronously. I've limited the amount of concurrent requests to 5 using an ActionBlock chained to a BufferBlock.

The Producer class is registered as a singleton and the goal is to allow all calls to api/test to feed into this one queue. This means that things like completing the block aren't an option.

What is the most efective way to wait for the completion of my initiated work from the controller?

Web API controller:

[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
    private Producer producer;

    public TestController(Producer producer)
    {
        this.producer = producer;
    }
    [HttpGet]
    public async Task<string[]> Values()
    {
        for (int i = 1; i <= 10; i++)
        {
            await this.producer.AddAsync(1);
        }

        // i've added my work to the queue, elegant completion required
        return new string[] { "value1", "value2" };
    }

}

Producer / Consumer implementation:

public class Producer
{
    private BufferBlock<int> queue;
    private ActionBlock<int> consumer;
    public List<int> results = new List<int>();

    private void InitializeChain()
    {
        queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
        consumer = new ActionBlock<int>(x =>
        {
            Thread.Sleep(5000);
            Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
            results.Add(x);
        }, consumerOptions);
        queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
    }
    public async Task AddAsync(int data)
    {
        await queue.SendAsync(data);
    }
    public Producer()
    {
        this.InitializeChain();
    }
}
H Bains
  • 11
  • 5
  • Would you like to change the return type of `AddAsync(int data)` from `Task` to `Task`, and the returned task to represent the asynchronous completion of processing the `data`, containing eventually the `int` result? If yes, there are some decent implementations in [this](https://stackoverflow.com/questions/50120304/request-response-pattern-with-tpl-dataflow) and [this](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy) questions. Btw the intermediate `BufferBlock` is not needed. The `ActionBlock` has an internal input buffer itself. – Theodor Zoulias Aug 19 '20 at 02:50

1 Answers1

2

So there are many approaches and synchronisation primitives you can use to solve this, each with their own benefits, fault tolerance, and issues depending on your needs. Here is an awaitable example with TaskCompletionSource

Given

public class Producer
{
   private BufferBlock<int> _queue;
   private ActionBlock<int> _consumer;
   public Action<int,string> OnResult;
   public Producer()
   {
      InitializeChain();
   }
   private void InitializeChain()
   {
      _queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
      var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };    
      _consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);   
      _queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
   }

   private async Task SomeIoWorkAsync(int x)
   {
      Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
      await Task.Delay(5000);
      OnResult?.Invoke(x,$"SomeResult {x}");
   }

   public Task AddAsync(int data) => _queue.SendAsync(data);
}

Awaitable

You could easily refactor this to to do the send and await in one call.

public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
   var tcs = new TaskCompletionSource<string>();

   producer.OnResult += (id,result) =>
   {
      if(id == myId)
         tcs.TrySetResult(result);
   };

   return tcs.Task;
}

Usage

var producer = new Producer();

// to simulate something you are waiting for, and id or what ever
var myId = 7;

// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);

// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
   for (int i = 1; i <= 20; i++)
      await producer.AddAsync(i);
});

// wait for your results to pop out
var result = await task;

Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");

// you can happily end here, the pipeline will keep going
Console.ReadKey();

output

12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20

Full Demo Here

Note : you may need to play with the example so it doesn't timeout

Example of doing it all at once

public async Task<string> AddAsync(int data)
{
   await _queue.SendAsync(data);
   return await WaitForConsumerAsync(data);
}

public Task<string> WaitForConsumerAsync(int data)
{
   var tcs = new TaskCompletionSource<string>();

   OnResult += (id, result) =>
   {
      if (id == data)
         tcs.TrySetResult(result);
   };

   return tcs.Task;
}

Additional notes

This is really only an academic example of an awaitable event. I assume that your pipeline is more complicated then the example given and you are doing a combination of CPU and IO bound workloads, additionally that you actually need a BufferBlock in this example it's redundant.

  1. If you were waiting for pure IO workloads, you would probably be better of to just await them, no pipeline needed.
  2. In the information you have given there is no real need to create back pressure with a BoundedCapacity unless you had some sort of memory constraints.
  3. You need to be careful with BoundedCapacity and the default EnsureOrdered = true. The pipeline will be more efficient with EnsureOrdered = false. Jobs will pop-out when they are finished and back pressure will not be affected by the ordering of different results, which will mean items will likely progress through the pipeline faster
  4. You could also use other framework like RX, which would likely make all this more elegant and fluent
  5. You could also get a small efficiency by setting SingleProducerConstrained = true as your blocks are linear
halfer
  • 19,824
  • 17
  • 99
  • 186
TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • @DavidCarter back-pressure would only help if there is lot of allocations in the pipeline, or in a more complicated design where multiple blocks need it – TheGeneral Aug 19 '20 at 02:45
  • @DavidCarter as for `WaitForConsumerAsync` this whole structure relies on the fact you can identify result from the data you sent in. You could use an ID if you have one. or the reference would likely be fine if the identity is maintained – TheGeneral Aug 19 '20 at 02:49
  • I think that the `WaitForConsumerAsync` is leaky, because it attaches a handler to the `OnResult` action, but the handler remains attached for ever. There should be a `-=` somewhere, to keep things tidy. – Theodor Zoulias Aug 19 '20 at 03:01
  • @TheodorZoulias the action handler will live as long as the scope does. after the task is awaited, its gone. you can test it for your self. There was an eric lippert post about this at one stage, just trying to dig it up – TheGeneral Aug 19 '20 at 03:12
  • @DavidCarter bounded capacity cant hurt, especially if you want to limit the call backs on the action (which is one thing i didnt consider). As to the best options for your situation. it kind of depends, since this is behind a controller i doubt there is a problem either way, its all async and no threads are harmed in the marking of this movie – TheGeneral Aug 19 '20 at 03:17
  • @DavidCarter if you want to prevent abuse don't feed the block with `SendAsync`, feed it with [`Post`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.post), and interpret a return value of `false` as indicator of abuse (throw a `TooManyConcurrentOperationsException`). – Theodor Zoulias Aug 19 '20 at 09:11
  • @MichaelRandall Would it be appropriate to pass a function into AddAsync from my controller, so that SomeIoWorkAsync runs my specific method instead of a fixed routine? I'd have to make a few adjustments to the producer class, but design wise i'm not sure if that's a correct approach. – H Bains Aug 21 '20 at 03:41
  • @HBains yeah this would probably capture the concern better – TheGeneral Aug 21 '20 at 03:43
  • @MichaelRandall I tried your doing it all at once example and it seems to process items sequentially. ` – H Bains Aug 21 '20 at 03:56
  • @HBains i cant really take a look at this right now, but ill try to get back to it later. I am sure something can be figured out – TheGeneral Aug 21 '20 at 04:00
  • @MichaelRandall Rookie mistake on my part. I had to edit my TestController as well: `List t = new List(); for (int i = 1; i <= 5; i++) t.Add(producer.AddAsync(i)); await Task.WhenAll(t);` – H Bains Aug 21 '20 at 04:09
  • @MichaelRandall If you do find some time later, i've hit a wall after realizing that if i'm passing in a function that might have a variable number of arguments, there's no way for me to declare the bufferblock and actionblock of any concrete type. For instance, the TestController might have one endpoint that passes in a function with 1 argument, while another endpoint in another controller can pass in a function with 2 arguments. Furthermore, since the Producer class is a singleton, I can't make it generic either. – H Bains Aug 21 '20 at 04:30
  • @HBains indeed this is a problem and changes this completely. Maybe you can start a new question with exact problem you are trying to solve. It seems you are trying to create a sort of message pump with limited concurrency with completely dynamic data. Dataflow is really only good for structured data. My gut feeling is its the wrong tool for the job, however id have to see the total domain of the problem – TheGeneral Aug 21 '20 at 04:35
  • @MichaelRandall I've added specific details here https://stackoverflow.com/questions/63516672/creating-an-asychronous-parallel-and-limited-queue-in-asp-net-core, I hope this clarifies what i'm trying to achieve. Sorry for the ambiguity, you've been a great help with my attempts. – H Bains Aug 21 '20 at 04:47