67

I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

How to ensure this message get processed in the same sequence/order of the Collection?

Mathiyazhagan
  • 1,389
  • 3
  • 13
  • 37
  • How about splitting the messages into batches and running each batch in parallel? – bit Apr 12 '16 at 05:52
  • Related question that targets asynchronous workloads: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations). – Theodor Zoulias Oct 20 '20 at 21:41
  • 1
    Check out this article by Mark Heath for several approaches with their pros and cons https://markheath.net/post/constraining-concurrent-threads-csharp WaitAsync() method should be used on SemaphoreSlim to make implementation truly asynchronous and return the waiting thread to the pool. – Igorium Jul 29 '21 at 15:37
  • also you can read these articles by Joseph Albahari https://www.albahari.com/threading/part2.aspx#_WaitHandle_Producer_Consumer_Queue https://www.albahari.com/threading/part4.aspx#_Wait_Pulse_Producer_Consumer_Queue https://www.albahari.com/threading/part5.aspx#_BlockingCollectionT – Ramil Shavaleev Jul 27 '22 at 12:43

11 Answers11

78

You could use Parallel.Foreach and rely on MaxDegreeOfParallelism instead.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
     // logic
     Process(msg);
});
Hari Prasad
  • 16,716
  • 4
  • 21
  • 35
  • 5
    This is exactly the kind of processing that `Parallel.ForEach` was made for. – Lasse V. Karlsen Apr 12 '16 at 06:15
  • And since the Task Parallel Library is build on the `ThreadPool` we can assume it will only run as many tasks as the system has cores if we do not specify it explicitly. – Toxantron Apr 12 '16 at 06:19
  • Would this ensure that the messages would be processed in the same order as they occur in the List? – bit Apr 12 '16 at 06:19
  • Yes, each message picked up in order. – Hari Prasad Apr 12 '16 at 06:23
  • Are you looking for tasks/threads to complete in sequence? parallel programming will not guarantees order of completion, it depends on lot of factors OS/Hardware factors. In simple words you can't predict when thread get's its turn for execution. . – Hari Prasad Apr 12 '16 at 06:39
  • 1
    I don't think bit was talking about completion but rather about processing order. In my programs, it seems that when using Parallel.Foreach, the list is cut in n sets (MaxDegreeOfParallelism). All sets are processed in parallel and within each set, the order is enforced. – Benjamin Baumann Aug 31 '17 at 08:29
  • I'm noticing that Parallel.ForEach doesn't correctly wait until all loops are completed if async is involved in the Action field... Is anyone else seeing this issue? – Dscoduc Jan 22 '20 at 22:22
  • 14
    I just wanted to caution everyone from using Parallel.ForEach for async I/O bound tasks. It was not really created for async operations. It will just start X thread-pool threads and block them during I/O waits. Use `SemaphoreSlim` instead – Alex from Jitbit May 18 '20 at 19:11
69

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}
CarenRose
  • 1,266
  • 1
  • 12
  • 24
ClearLogic
  • 3,616
  • 1
  • 23
  • 31
  • 2
    What will be if `Process` method will run for a long time? `concurrencySemaphore.Release()` may be called when `concurrencySemaphore` is disposed already. And as a result - `ObjectDisposedException`. – Vasyl Zvarydchuk Jul 18 '17 at 23:23
  • @VasylZvarydchuk you are right.I have updated the answer – ClearLogic Jul 19 '17 at 07:00
  • 2
    How can it be that semaphore is disposed before all tasks have finished? –  Jun 18 '18 at 16:13
  • @VasylZvarydchuk - how will the semaphore be disposed before released, even if Process runs long time? – grunt Aug 13 '18 at 13:45
  • It wont be disposed in the current Answer.comment out `Task.WaitAll` and see yourself – ClearLogic Sep 09 '18 at 03:13
  • This doesn't work well if `Process` is an async method. The "Exited using block" prints before the last `maxConcurrency` tasks finish. Any idea how to handle this? – empz Sep 17 '20 at 10:46
  • I've added an answer which is based on this one but for an async Process method. In this case, Task.Factory.StartNew cannot be used. Task.Run is the right method here. – empz Sep 17 '20 at 11:58
11

I think it would be better to use Parallel LINQ

  Parallel.ForEach(messages ,
     new ParallelOptions{MaxDegreeOfParallelism = 4},
            x => Process(x);
        );

where x is the MaxDegreeOfParallelism

Mr.Hunt
  • 4,833
  • 2
  • 22
  • 28
7

With .NET 5.0 and Core 3.0 channels were introduced.
The main benefit of this producer/consumer concurrency pattern is that you can also limit the input data processing to reduce resource impact.
This is especially helpful when processing millions of data records.
Instead of reading the whole dataset at once into memory, you can now consecutively query only chunks of the data and wait for the workers to process it before querying more.

Code sample with a queue capacity of 50 messages and 5 consumer threads:

/// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
public static async Task ProcessMessages(List<string> messages)
{
    const int producerCapacity = 10, consumerTaskLimit = 3;
    var channel = Channel.CreateBounded<string>(producerCapacity);

    _ = Task.Run(async () =>
    {
        foreach (var msg in messages)
        {
            await channel.Writer.WriteAsync(msg);
            // blocking when channel is full
            // waiting for the consumer tasks to pop messages from the queue
        }

        channel.Writer.Complete();
        // signaling the end of queue so that 
        // WaitToReadAsync will return false to stop the consumer tasks
    });

    var tokenSource = new CancellationTokenSource();
    CancellationToken ct = tokenSource.Token;

    var consumerTasks = Enumerable
    .Range(1, consumerTaskLimit)
    .Select(_ => Task.Run(async () =>
    {
        try
        {
            while (await channel.Reader.WaitToReadAsync(ct))
            {
                ct.ThrowIfCancellationRequested();
                while (channel.Reader.TryRead(out var message))
                {
                    await Task.Delay(500);
                    Console.WriteLine(message);
                }
            }
        }
        catch (OperationCanceledException) { }
        catch
        {
            tokenSource.Cancel();
            throw;
        }
    }))
    .ToArray();

    Task waitForConsumers = Task.WhenAll(consumerTasks);
    try { await waitForConsumers; }
    catch
    {
        foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
            Console.WriteLine(e.ToString());

        throw waitForConsumers.Exception.Flatten();
    }
}

As pointed out by Theodor Zoulias: On multiple consumer exceptions, the remaining tasks will continue to run and have to take the load of the killed tasks. To avoid this, I implemented a CancellationToken to stop all the remaining tasks and handle the exceptions combined in the AggregateException of waitForConsumers.Exception.

Side note:
The Task Parallel Library (TPL) might be good at automatically limiting the tasks based on your local resources. But when you are processing data remotely via RPC, it's necessary to manually limit your RPC calls to avoid filling the network/processing stack!

5andr0
  • 1,578
  • 1
  • 18
  • 25
  • 1
    This is an attempt to reinvent the [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1). It looks nice, but it has a problem. In case a consumer fails, the rest of the consumers will keep working, and the `processMessages` will continue running with a reduced degree of parallelism. If all consumers except one fail, the last standing consumer will slowly process all remaining messages alone, until the exceptions are finally surfaced. – Theodor Zoulias Sep 14 '21 at 15:30
  • Thanks for pointing that out! I added a try catch block and a comment – 5andr0 Sep 14 '21 at 20:24
  • Nah, I don't think that swallowing exceptions is the correct way to solve this problem. The correct way is to fail fast as soon as an exception is observed. When a consumer fails, all other consumers should stop consuming the channel, and should `break` immediately after processing their current message. That's how the `ActionBlock`, the `Parallel` class and the PLINQ library behave. You need to put a lot more work on this, in order to make it a featureless substitute of an `ActionBlock`, that has at least correct behavior in what little is able to do. – Theodor Zoulias Sep 14 '21 at 20:42
  • Oh now I get it, been in a hurry, sorry! Any idea how to solve this? I tried Parallels _foreach_ on the **IAsyncEnumerable** from *channel.Reader.ReadAllAsync()* but that seems incompatible :/ I'm not a big fan of _CancellationTokens_ since they don't forcekill the other tasks and the user has to implement cancellation checks – 5andr0 Sep 14 '21 at 22:27
  • 1
    I think that you've solved it. A `CancellationToken` is exactly what's needed in this case IMHO. – Theodor Zoulias Sep 14 '21 at 22:30
  • Something else to have in mind is that the `await Task.WhenAll(consumerTasks);` is going to propagate just one exception, which is the exception of the first failed task in the `consumerTasks` array, and not necessarily the first exception that happened in chronological order. In contrast the `ActionBlock`, the `Parallel` class and the PLINQ library propagate all errors in chronological order, bundled in an `AggregateException`. – Theodor Zoulias Sep 15 '21 at 09:49
  • Thanks for your valuable input! Just found out, that I can use the AggregateException of the Task.Exception field! That way I can use the async WhenAll with AggregateExceptions without having to use WaitAll! – 5andr0 Sep 15 '21 at 12:17
  • 2
    Yeap, now it's better. At this point you've probably realized that implementing this kind of functionality using `Channel`s and `Task`s is both challenging and laborious. And actually doing it doesn't make much sense, except from being a learning experience, when this functionality is already available natively in .NET 5, in the form of the `ActionBlock` class (not to mention the `TransformBlock` and the other powerful blocks of the TPL Dataflow library). You can see an `ActionBlock` in action [here](https://stackoverflow.com/a/65251949/11178549). – Theodor Zoulias Sep 15 '21 at 18:39
3

If your Process method is async you can't use Task.Factory.StartNew as it doesn't play well with an async delegate. Also there are some other nuances when using it (see this for example).

The proper way to do it in this case is to use Task.Run. Here's @ClearLogic answer modified for an async Process method.

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Run(async () =>
            {
                try
                {
                    await Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static async Task Process(string msg)
{            
    await Task.Delay(2000);
    Console.WriteLine(msg);
}
empz
  • 11,509
  • 16
  • 65
  • 106
  • This solution blocks needlessly the calling thread (`.Wait()`, `.WaitAll()`), so I would consider it to be suboptimal. – Theodor Zoulias Sep 17 '20 at 12:24
  • @TheodorZoulias The accepted answer uses the same approach. This is just a slight modification when the Process method is async. If you don't want to block the calling thread, then simply make your calling thread (in this case the Main) async and replace `Task.WaitAll` with `await Task.WhenAll`.This is a simplified case where the calling thread is static void main. But if it were a web request with async processing, this would work just fine without blocking anything. – empz Sep 17 '20 at 16:41
  • Fair enough. I just downvoted the accepted answer too. I don't like blocking solutions when an asynchronous one is available. – Theodor Zoulias Sep 17 '20 at 22:26
  • 1
    Excellent solution. I would suggest making the change you mentioned above: changing `Task.WaitAll()` to `await Task.WhenAll()`. Could be helpful to change the `Main` function to be an async function to show that this whole approach could be its own awaitable function. – Dave Jan 07 '22 at 17:23
0

You can create your own TaskScheduler and override QueueTask there.

protected virtual void QueueTask(Task task)

Then you can do anything you like.

One example here:

Limited concurrency level task scheduler (with task priority) handling wrapped tasks

Community
  • 1
  • 1
Serve Laurijssen
  • 9,266
  • 5
  • 45
  • 98
0

You can simply set the max concurrency degree like this way:

int maxConcurrency=10;
var messages = new List<1000>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    foreach(var msg in messages)
    {
        Task.Factory.StartNew(() =>
        {
            concurrencySemaphore.Wait();
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });
    }
}
error_handler
  • 1,191
  • 11
  • 19
  • 6
    This unnecessarily blocks threads, if the thread pool has more threads than your max concurrency. – yaakov Apr 12 '16 at 07:20
0

If you need in-order queuing (processing might finish in any order), there is no need for a semaphore. Old fashioned if statements work fine:

        const int maxConcurrency = 5;
        List<Task> tasks = new List<Task>();
        foreach (var arg in args)
        {
            var t = Task.Run(() => { Process(arg); } );

            tasks.Add(t);

            if(tasks.Count >= maxConcurrency)
                Task.WaitAny(tasks.ToArray());
        }

        Task.WaitAll(tasks.ToArray());
Neil Hunt
  • 19
  • 1
  • Is it supposed to wait in the loop and only hit the last `WaitAll` when close to the end? Because in my experience, it just screams through the loop en hit the `WaitAll` almost instantly – Pierre Apr 06 '18 at 06:09
  • 1
    yeah, that's because completed tasks aren't removed from the tasks list, so the next time `WaitAny` gets hit, it finds the first complete task and moves on. – daf Dec 11 '20 at 06:38
0

I ran into a similar problem where I wanted to produce 5000 results while calling apis, etc. So, I ran some speed tests.

Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
    new ParallelOptions { MaxDegreeOfParallelism = 100 };
    GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
});

produced 100 results in 30 seconds.

Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
    new ParallelOptions { MaxDegreeOfParallelism = 100 };
    GetProductMetaData(productsMetaData, client, id);
});

Moving the GetAwaiter().GetResult() to the individual async api calls inside GetProductMetaData resulted in 14.09 seconds to produce 100 results.

foreach (var id in ids.Take(100))
{
    GetProductMetaData(productsMetaData, client, id);
}

Complete non-async programming with the GetAwaiter().GetResult() in api calls resulted in 13.417 seconds.

var tasks = new List<Task>();
while (y < ids.Count())
{
    foreach (var id in ids.Skip(y).Take(100))
    {
        tasks.Add(GetProductMetaData(productsMetaData, client, id));
    }

    y += 100;
    Task.WhenAll(tasks).GetAwaiter().GetResult();
    Console.WriteLine($"Finished {y}, {sw.Elapsed}");
}

Forming a task list and working through 100 at a time resulted in a speed of 7.36 seconds.

            using (SemaphoreSlim cons = new SemaphoreSlim(10))
            {
                var tasks = new List<Task>();
                foreach (var id in ids.Take(100))
                {
                    cons.Wait();
                    var t = Task.Factory.StartNew(() =>
                    {
                        try
                        {
                            GetProductMetaData(productsMetaData, client, id);
                        }
                        finally
                        {
                            cons.Release();
                        }
                    });

                    tasks.Add(t);
                }

                Task.WaitAll(tasks.ToArray());
            }

Using SemaphoreSlim resulted in 13.369 seconds, but also took a moment to boot to start using it.

var throttler = new SemaphoreSlim(initialCount: take);
foreach (var id in ids)
{
    throttler.WaitAsync().GetAwaiter().GetResult();
    tasks.Add(Task.Run(async () =>
    {
        try
        {
            skip += 1;
            await GetProductMetaData(productsMetaData, client, id);

            if (skip % 100 == 0)
            {
                Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
            }
        }
        finally
        {
            throttler.Release();
        }
    }));
}

Using Semaphore Slim with a throttler for my async task took 6.12 seconds.

The answer for me in this specific project was use a throttler with Semaphore Slim. Although the while foreach tasklist did sometimes beat the throttler, 4/6 times the throttler won for 1000 records.

I realize I'm not using the OPs code, but I think this is important and adds to this discussion because how is sometimes not the only question that should be asked, and the answer is sometimes "It depends on what you are trying to do."

Now to answer the specific questions:

  1. How to limit the maximum number of parallel tasks in c#: I showed how to limit the number of tasks that are completed at a time.
  2. Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time? I cannot guess how many will be processed at a time unless I set an upper limit but I can set an upper limit. Obviously different computers function at different speeds due to CPU, RAM etc. and how many threads and cores the program itself has access to as well as other programs running in tandem on the same computer.
  3. How to ensure this message get processed in the same sequence/order of the Collection? If you want to process everything in a specific order, it is synchronous programming. The point of being able to run things asynchronously is ensuring that they can do everything without an order. As you can see from my code, the time difference is minimal in 100 records unless you use async code. In the event that you need an order to what you are doing, use asynchronous programming up until that point, then await and do things synchronously from there. For example, task1a.start, task2a.start, then later task1a.await, task2a.await... then later task1b.start task1b.await and task2b.start task 2b.await.
Patrick Knott
  • 1,666
  • 15
  • 15
  • 1
    Hi Patrick. What is the `new ParallelOptions { MaxDegreeOfParallelism = 100 };` doing inside the body of the `Parallel.ForEach`? What is the signature of the `GetProductMetaData` method that you are using as an example? If this method is asynchronous, then how is it relevant with a question related with parallel synchronous work? For limiting the *concurrency* (not parallelism) of asynchronous operations, there are other more relevant questions, like [this](https://stackoverflow.com/questions/10806951) or [this](https://stackoverflow.com/questions/11564506). – Theodor Zoulias Sep 14 '21 at 19:37
  • https://stackoverflow.com/users/11178549/theodor-zoulias, thank you for answering why I've been downvoted, I will likely remove my answer, even though it is crazy useful, I shouldn't have tried to give people advice here. I was specifically looking for ways to limit how many async threads I would run at a time. The method I used was originally synchronous and now is asynchronous. but I tested it sync as well. MaxDegreeOfParallelism can be inside the ForEach loop. I tested each of the methods recommended here, and the task list was by far the fastest... although it was not synchronous. – Patrick Knott Sep 14 '21 at 19:51
  • stackoverflow.com/users/11178549/theodor-zoulias obviously you are correct on this. Despite all of my lack of expertise and capacity to speak with the correct terminology, I still did all of the work of speed testing these things and approached with a different intelligent answer. As you said, the same thread is doing all the work, and obviously we could then add threads on top of async. What is the point of multithreading and async but to worry about speed/performance? And therein is why my answer is pertinent. My mistake was not in the answer, but rather in my comment. – Patrick Knott Sep 14 '21 at 20:16
  • Once I get one more negative, I'll delete this for the peer pressure badge. None of you need my research, right? Any Takers? – Patrick Knott Sep 14 '21 at 20:33
-1
 public static void RunTasks(List<NamedTask> importTaskList)
    {
        List<NamedTask> runningTasks = new List<NamedTask>();

        try
        {
            foreach (NamedTask currentTask in importTaskList)
            {
                currentTask.Start();
                runningTasks.Add(currentTask);

                if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
                {
                    Task.WaitAny(runningTasks.ToArray());
                }
            }

            Task.WaitAll(runningTasks.ToArray());
        }
        catch (Exception ex)
        {
            Log.Fatal("ERROR!", ex);
        }
    }
Daniel
  • 9
  • 1
-1

you can use the BlockingCollection, If the consume collection limit has reached, the produce will stop producing until a consume process will finish. I find this pattern more easy to understand and implement than the SemaphoreSlim.

int TasksLimit = 10;
BlockingCollection<Task> tasks = new BlockingCollection<Task>(new ConcurrentBag<Task>(), TasksLimit);

void ProduceAndConsume()
{
    var producer = Task.Factory.StartNew(RunProducer);
    var consumer = Task.Factory.StartNew(RunConsumer);

    try
    {
        Task.WaitAll(new[] { producer, consumer });
    }
    catch (AggregateException ae) { }
}

void RunConsumer()
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        task.Start();
    }
}

void RunProducer()
{
    for (int i = 0; i < 1000; i++)
    {
        tasks.Add(new Task(() => Thread.Sleep(1000), TaskCreationOptions.AttachedToParent));
    }
}

Note that the RunProducer and RunConsumer has spawn two independent tasks.

Shahar Shokrani
  • 7,598
  • 9
  • 48
  • 91
  • I guess that the OP also wants to know when their tasks will all be completed. This solution is missing this functionality. – Theodor Zoulias Mar 24 '20 at 22:13
  • Hey @TheodorZoulias, thanks for the comment, not sure I did understand, you will know when all the tasks will be completed after the `Task.WaitAll` has finished – Shahar Shokrani Mar 25 '20 at 06:10
  • 1
    After the `Task.WaitAll` the tasks `producer` and `consumer` will be completed, but some of the 1000 tasks that were added in the `BlockingCollection` will still be running. – Theodor Zoulias Mar 25 '20 at 06:17
  • 1
    I've updated my answer, I believe there are times when you don't care to be not notified when all the tasks has completed. – Shahar Shokrani Mar 29 '20 at 11:25
  • OK. I just noticed another problem though. The loop that consumes the `BlockingCollection` just starts the tasks, does not wait them to complete. Starting a task is not a CPU-intensive job, it happens practically instantly. So I think that all 1000 tasks will start immediately, and the objective of limiting the parallelism will not be achieved. – Theodor Zoulias Mar 29 '20 at 11:39
  • Hey @TheodorZoulias, I've updated my answer again to "fill two needs with one deed", If `AttachedToParent` is flagged, then its parent consumer will wait for the child tasks, and the user will be notify only when all child tasks are completed! – Shahar Shokrani Mar 29 '20 at 13:29
  • Hi Shahar. The `AttachedToParent` is a clever solution for the first problem, but the second remains unsolved. The parallelism is not limited. The only limiting factor is the availability of `ThreadPool` threads. Configuring the `TasksLimit` variable has no effect. – Theodor Zoulias Mar 29 '20 at 15:34