-1

I have a blockingcollection of downloads and I would like to have a maximum number of concurrent download I can do, aka a maximum number of concurrent await downloadService.download(release), but the number of items the BlockingCollection should be basically infinite, so let's say I can have 49945 downloads in the BlockingCollection but I should download at the same time just 5 as maximum, whenever a download is finished, it will get another one, in order if possible. Here's my actual code:

BlockingCollection<Download> sendQueue = new BlockingCollection<Download>(new ConcurrentQueue<Download>());
while (true)
            {
                var release = sendQueue.Take();
                ThreadPool.QueueUserWorkItem(async rlZ =>
                {
                    //do whatever you have to do
                    await downloadService.download(release);
                }, release);
            }
TePi
  • 37
  • 6
  • you need to keep the counter for 5 items once any thread finished need to reduce the count and add new items into the queue. https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=net-5.0 – coder_b Dec 25 '20 at 20:57
  • @mjwills Isn't that going to limit the amount of elements the BlockingCollection can have instead of the number of concurrent actions I want to do with the elements of the BlockingCollection? – TePi Dec 25 '20 at 23:18
  • The `ThreadPool.QueueUserWorkItem` and the `BlockingCollection` are not the most suitable tools to use when you have I/O-bound work to do, like downloading stuff from the web. You can see better ways to do it here: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations/) – Theodor Zoulias Dec 26 '20 at 07:25

1 Answers1

1

Set a maximum processing of items in a BlockingCollection

You can't set this in a BlockingCollection because a BlockingCollection doesn't do any processing - it's just a collection.

If you want to restrict the number of simultaneous processors, then you'll need to put that in the processing code itself. The standard way of throttling asynchronous code is with SemaphoreSlim.

However, in your case, I'd recommend having n consumers. This makes the code cleaner.

var consumers = new Task[]
{
  Task.Run(ConsumerAsync),
  Task.Run(ConsumerAsync),
  Task.Run(ConsumerAsync),
  Task.Run(ConsumerAsync),
  Task.Run(ConsumerAsync),
};
await Task.WhenAll(consumers);

static async Task ConsumerAsync()
{
  foreach (var release in sendQueue.GetConsumingEnumerable())
    await downloadService.download(release);
}

Tip: If you use an async-compatible queue like a Channel instead of a BlockingCollection, then you can use an async enumerable instead of a consuming enumerable and remove the Task.Run calls.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • I have read about Channel and it's pretty awesome! I'm in a position where I have a single producer adding an unlimited and infinite numbers of Download to a queue (Channel right now) and n consumers (5 maximum concurrently), I'm currently really confused with it, do I have to create a single Task (Consumer) with a while true loop where I TryRead each Download item? Create n number of Task controlling that I have 5 and no more with SemaphoreSlim and WaitToReadAsync the Channel? As simple as it sounds, I just want to process an unlimited number of Web Downloads but 5 at the same time at max. – TePi Dec 26 '20 at 17:42
  • @TePi: For Channels, use a similar structure to the code I posted. Just replace `GetConsumingEnumerable` with `ReadAllAsync` and change the `foreach` to `await foreach`. Then remove the `Task.Run` calls. You'll end up with five asynchronous consumers. – Stephen Cleary Dec 28 '20 at 15:48