1

I have a current algorithm that goes like this.

public class Executor
{ 
    private ParallelOptions options = new ParallelOptions();
    private IList<Step> AllSteps;
    public void Execute()
    {
        options.MaxDegreeOfParallelism = 4;
        var rootSteps = AllSteps.Where(s => !s.Parents.Any());
        Parallel.Foreach(rootSteps, options, RecursivelyExecuteStep);
    }   
    private void RecursivelyExecuteStep(Step step)
    {
        ExecuteStep();
        var childSteps = AllSteps.Where(s=>s.Parents.Contains(step) 
            && step.Parents.All(p=>p.IsComplete);
        Parallel.ForEach(childSteps, options, RecursivelyExecuteStep);
    }
}

ParallelOptions.MaxDegreeOfParallelism will be an input variable (but left it out of the code example for brevity).

I was wondering if thread pooling is handled for me automatically or if this creates new threads every time. Also what's the best way to optimize this, is thread pooling something I want. How do I use thread pooling. I'm fairly new to multithreading and what's new in 4.5[.1]

Will this not limit the algorithm to only 4 threads because each Parallel.Foreach would have it's own MaxDegreeOfParallelism of 4 thus not limiting all the threads in the app to 4? How do I achieve limiting all threading in the app to 4?

Edit: MaxDegreeOfParallelism

VMAtm
  • 27,943
  • 17
  • 79
  • 125
lanierhall
  • 530
  • 1
  • 5
  • 15
  • What is this `MaxThreadPoolSize`? `ParallelOptions` has `MaxDegreeOfParallelism` but no `MaxThreadPoolSize`... – spender Apr 08 '15 at 10:34
  • What is the workload here? Are you likely to be CPU constrained, or are you hanging on IO? – spender Apr 08 '15 at 10:37
  • These steps may or may not be bulk selecting & inserting. I'm assuming they would be mostly delayed by IO but also could be crunching huge datasets in memory. – lanierhall Apr 08 '15 at 15:42
  • Isn't there a race condition in your original algorithm in that a step with multiple parents may be executed multiple times? E.g. Suppose parents P1 and P2 of step S complete `ExecuteStep` (which I assume sets `IsComplete = true`) before either thread reaches the next line in `RecursivelyExecuteStep`. In this case both P1 and P2 threads will identify S as complete and recursively execute step S. – Michael Petito Apr 08 '15 at 16:35
  • I've been wondering that myself, I believe there is a race condition. – lanierhall Apr 08 '15 at 19:29

3 Answers3

1

Parallel.Foreach basically is a nice way to queue up work items to the .NET ThreadPool.

Your application (process) has only one instance of the ThreadPool, and it tries to be as smart as possible regarding how many concurrent threads it uses, taking things like number of available cores and virtual memory into account.

So yes, the .NET ThreadPool handles thread pooling for you, and in many cases you don't need to worry about it, use Parallel.Foreach and let it get on with it.

EDIT: As noted by others, you should be careful in overusing the ThreadPool since it is a shared resource and it may disturb other parts of your application. It will also start creating new threads if your items are blocking or very long-running, which often is wasteful. A rule of thumb is that the work items should be relatively quick and preferably non-blocking. You should test and benchmark, and if it works for your use case then it is very convenient.

You can control the max number of concurrent threads used by the ThreadPool in your application if you want explicit control, by calling ThreadPool.SetMaxThreads. I'd advice against that unless you really have to though, and know what you are doing. The ThreadPool already tries to avoid using more concurrent threads than you have cores for example.

What you can do with ParallellOptions.MaxDegreeOfParallelism is only to further limit the number of concurrent ThreadPool threads that are used to execute that particular call to Parallel.Foreach.

If you need more explicit control of how many concurrent threads an invocation of your algorithm uses, here are some possible alternatives (in, arguably, increasing implementation complexity):

  • With the default ThreadPool you can't limit concurrency while calling Parellel.Foreach recursively. You could for example consider using Parallel.Foreach only on the top level (with a ParellelOptions.MaxDegreeOfParallelism argument) and let RecursivelyExecuteStep use a standard foreach.
  • Modify (or replace) the ThreadPool for your algorithm to limit concurrency by setting ParallelOptions.TaskScheduler to an instance of QueuedTaskScheduler from Parallel Extension Extras as described here.
  • As suggested by @VMAtm, you can use TPL Dataflow to get more explicit control of how your computations are performed, including concurrency (this can also be combined with a custom task scheduler if you really want to knock yourself out).
Community
  • 1
  • 1
Magnus
  • 546
  • 3
  • 10
  • "The ThreadPool already won't use more concurrent threads than you have cores for example." Actually, if the thread pool's queues are growing, the ThreadPool will fire up new threads to deal with the backlog. On my 4 core (8 virtual core) machine, `ThreadPool.GetMaxThreads` reports 1023 for worker threads. If work is CPU constrained, I'd venture that the ThreadPool isn't the right place to be running the work in the first place, leading to unexpected latency in other areas of your app as the ThreadPool ponders (at a surprisingly leisurely rate) whether or not to fire up extra threads. – spender Apr 08 '15 at 10:28
  • @spender. Yes, true, when you should use the ThreadPool or not is not always clear cut, and depends on what the rest of the application is doing. You should for example ideally use the ThreadPool only for compute bound, non-blocking, work, since when a ThreadPool thread is blocked, e.g., waiting for IO, the ThreadPool creates a new, live, one. So it can have a lot of blocked threads, that it will gradually kill off when returned. I don't think it will create more threads just to handle larger queues, unless it's other threads are blocked. But I may of course be wrong on that. – Magnus Apr 08 '15 at 10:43
  • @spender. If you have 1023 threads in your application I think you should consider some sort of redesign if possible, that sounds quite inefficient. A thread typically uses about 1MB for internal structures and stack, so that is 1GB worth of memory overhead, not the mention the thread switching going on. And I don't think I am wrong on the ThreadPool not holding on to more concurrent threads than there are cores (at least not for long). There would be no point. – Magnus Apr 08 '15 at 11:18
  • Alarms go off in my head when the ThreadPool size is greater than the number of (v)cores on my machine. It says that something isn't using the ThreadPool as it was designed to be used... I was trying to say that the default value for the maximum number of worker threads in the ThreadPool is 1023 on my machine. If the workqueues start backing up, yes, it will start firing up new threads and eating into this maxthreads allowance. If the work sitting in the queue is CPU constrained anyway, this probably isn't going to help things anyway and will just lead to more context switching. – spender Apr 08 '15 at 11:33
  • As a strategy for dealing with a growing work queue, this works OK of the threads are waiting on IO and idling. If the work is CPU bound, the whole model breaks down. This is why, for instance, when you use `TaskFactory.StartNew` you can pass the hint `TaskCreationOptions.LongRunning`, which takes the work out of the ThreadPool. ThreadPool simply isn't the right place for long-running CPU bound operations. – spender Apr 08 '15 at 11:38
  • Ah, yes, sorry for misunderstanding you original comment. I agree, the ThreadPool is not the place for long-running operations. I also try to avoid using the ThreadPool for IO-bound, or otherwise blocking, operations, at least if performance or scaling is an issue, since it tends to cause creation and subsequent destruction of new threads. If that overhead is insignificant in the specific application then of course it is very convenient. I think I'll have to defer further discussion on when the ThreadPool decides to create new threads, it's a complicated beast :-). – Magnus Apr 08 '15 at 12:17
  • @spender. Come to think of it, what you are saying is that the ThreadPool in the face of long running threads creates and schedules new threads for new items, in the hope that the round-robin scheduler let the new items get their fair share of the CPU. Makes sense, didn't know that. Thanks! Of course, it still is just the ThreadPool trying to be robust against misuse, as you are saying. – Magnus Apr 08 '15 at 12:41
1

You can solve this problem with TPL DataFlow library (you can get it via NuGet). As it is said in other answer, Parallel class is using the ThreadPool internally, and you should not be bothered with that.

With the TPL Dataflow the only thing you need is create an TransformManyBlock<TInput,TOutput> linked on itself (or link BufferBlock with ActionBlock with Encapsulate extension), and set the MaxDegreeOfParallelism = 4 or whatever constant you think it should be.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • TPL DataFlow gives you better control of the computational tree and is probably a better fit for this problem than Parallel.Foreach. Unless you give it your own `TaskScheduler` the items are still queued to the standard ThreadPool though. – Magnus Apr 09 '15 at 13:28
  • What do you mean for a `Standart` ThreadPool? In UI-apps this will be single-threaded UI pool – VMAtm Apr 09 '15 at 13:46
  • I mean the .NET ThreadPool, as given by [TaskScheduler.Current](https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.current%28v=vs.110%29.aspx) which afaik is what TPL uses by default to schedule tasks (also on the UI-thread). If you want a task scheduled on the UI-thread, you can use the task scheduler given by `TaskScheduler.FromCurrentSynchronizationContext()` (called on the UI-thread of course). – Magnus Apr 09 '15 at 14:55
  • @Magnus `TaskScheduler.Current` uses `Current` synchronization context, and in UI-app all `Task` will be executed in `Current` thread! Read related question: http://stackoverflow.com/questions/6800705/why-is-taskscheduler-current-the-default-taskscheduler – VMAtm Apr 09 '15 at 15:00
  • Hmm, I think you need to distinguish between TaskScheduler and SynchronizationContext/Dispatcher (admittedly a somewhat confusing subject...). Here is [another related question](http://stackoverflow.com/questions/16916253/how-to-get-a-task-that-uses-synchronizationcontext-and-how-are-synchronizationc) with some good info and links. The question you refer to uses `TaskScheduler.FromSynchronizationContext()` to schedule a task continuation on the UI-thread. – Magnus Apr 09 '15 at 15:32
  • @Magnus Yes. I just wanted to point out that if you simply use `await` or task creation, you can fall into the same Thread, if you provide the wrong context to the scheduler. – VMAtm Apr 09 '15 at 15:38
  • @Magnus And I think that we are discussing exactly the same from different point of view. Sorry for my English. – VMAtm Apr 09 '15 at 15:39
0

A simple straightforward implementation could look like the following:

ParallelOptions Options = new ParallelOptions{MaxDegreeOfParallelism = 4};
IList<Step> AllSteps;

public void Execute()
{
    var RemainingSteps = new HashSet<Step>(AllSteps);

    while(RemainingSteps.Count > 0)
    {
        var ExecutableSteps = RemainingSteps.Where(s => s.Parents.All(p => p.IsComplete)).ToList();

        Parallel.ForEach(ExecutableSteps, Options, ExecuteStep);

        RemainingSteps.ExceptWith(ExecutableSteps);
    }   
}

Granted, this will execute steps in phases, so you will not always have maximum concurrency. You may only be executing one step at the end of each phase, since the next steps to execute are only realized after all steps in the current phase complete.

If you want to improve concurrency, I would suggest using a BlockingCollection. You'll need to implement a custom partitioner to use Parallel.ForEach against the blocking collection in this case. You'll also want a concurrent collection of the remaining steps, so that you don't queue the same step multiple times (the race condition previously commented on).

public class Executor
{ 
    ParallelOptions Options = new ParallelOptions() { MaxDegreeOfParallelism = 4 };

    IList<Step> AllSteps;

    //concurrent hashset of remaining steps (used to prevent race conditions)
    ConcurentDictionary<Step, Step> RemainingSteps = new ConcurentDictionary<Step, Step>();

    //blocking collection of steps that can execute next
    BlockingCollection<Step> ExecutionQueue = new BlockingCollection<Step>();

    public void Execute()
    {
        foreach(var step in AllSteps)
        {
            if(step.Parents.All(p => p.IsComplete))
            {
                ExecutionQueue.Add(step);
            }
            else
            {
                RemainingSteps.Add(step, step);
            }
        }

        Parallel.ForEach(
            GetConsumingPartitioner(ExecutionQueue),
            Options,
            Execute);
    }

    void Execute(Step step)
    {
        ExecuteStep(step);

        if(RemainingSteps.IsEmpty)
        {
            //we're done, all steps are complete
            executionQueue.CompleteAdding();
            return;
        }

        //queue up the steps that can execute next (concurrent dictionary enumeration returns a copy, so subsequent removal is safe)
        foreach(var step in RemainingSteps.Values.Where(s => s.Parents.All(p => p.IsComplete)))
        {
            //note, removal only occurs once, so this elimiates the race condition
            Step NextStep;
            if(RemainingSteps.TryRemove(step, out NextStep))
            {
                executionQueue.Add(NextStep);
            }
        }
    }

    Partitioner<T> GetConsumingPartitioner<T>(BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }

    class BlockingCollectionPartitioner<T> : Partitioner<T>
    {
        readonly BlockingCollection<T> Collection;

        public BlockingCollectionPartitioner(BlockingCollection<T> collection)
        {
            if (collection == null) throw new ArgumentNullException("collection");

            Collection = collection;
        }

        public override bool SupportsDynamicPartitions { get { return true; } }

        public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
        {
            if (partitionCount < 1) throw new ArgumentOutOfRangeException("partitionCount");

            var Enumerable = GetDynamicPartitions();

            return Enumerable.Range(0, partitionCount)
                             .Select(i => Enumerable.GetEnumerator()).ToList();
        }

        public override IEnumerable<T> GetDynamicPartitions()
        {
            return Collection.GetConsumingEnumerable();
        }
    }
}
Community
  • 1
  • 1
Michael Petito
  • 12,891
  • 4
  • 40
  • 54