3

I would like to have a custom thread pool satisfying the following requirements:

  1. Real threads are preallocated according to the pool capacity. The actual work is free to use the standard .NET thread pool, if needed to spawn concurrent tasks.
  2. The pool must be able to return the number of idle threads. The returned number may be less than the actual number of the idle threads, but it must not be greater. Of course, the more accurate the number the better.
  3. Queuing work to the pool should return a corresponding Task, which should place nice with the Task based API.
  4. NEW The max job capacity (or degree of parallelism) should be adjustable dynamically. Trying to reduce the capacity does not have to take effect immediately, but increasing it should do so immediately.

The rationale for the first item is depicted below:

  • The machine is not supposed to be running more than N work items concurrently, where N is relatively small - between 10 and 30.
  • The work is fetched from the database and if K items are fetched then we want to make sure that there are K idle threads to start the work right away. A situation where work is fetched from the database, but remains waiting for the next available thread is unacceptable.

The last item also explains the reason for having the idle thread count - I am going to fetch that many work items from the database. It also explains why the reported idle thread count must never be higher than the actual one - otherwise I might fetch more work that can be immediately started.

Anyway, here is my implementation along with a small program to test it (BJE stands for Background Job Engine):

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace TaskStartLatency
{
    public class BJEThreadPool
    {
        private sealed class InternalTaskScheduler : TaskScheduler
        {
            private int m_idleThreadCount;
            private readonly BlockingCollection<Task> m_bus;

            public InternalTaskScheduler(int threadCount, BlockingCollection<Task> bus)
            {
                m_idleThreadCount = threadCount;
                m_bus = bus;
            }

            public void RunInline(Task task)
            {
                Interlocked.Decrement(ref m_idleThreadCount);
                try
                {
                    TryExecuteTask(task);
                }
                catch
                {
                    // The action is responsible itself for the error handling, for the time being...
                }
                Interlocked.Increment(ref m_idleThreadCount);
            }

            public int IdleThreadCount
            {
                get { return m_idleThreadCount; }
            }

            #region Overrides of TaskScheduler

            protected override void QueueTask(Task task)
            {
                m_bus.Add(task);
            }

            protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
            {
                return TryExecuteTask(task);
            }

            protected override IEnumerable<Task> GetScheduledTasks()
            {
                throw new NotSupportedException();
            }

            #endregion

            public void DecrementIdleThreadCount()
            {
                Interlocked.Decrement(ref m_idleThreadCount);
            }
        }

        private class ThreadContext
        {
            private readonly InternalTaskScheduler m_ts;
            private readonly BlockingCollection<Task> m_bus;
            private readonly CancellationTokenSource m_cts;
            public readonly Thread Thread;

            public ThreadContext(string name, InternalTaskScheduler ts, BlockingCollection<Task> bus, CancellationTokenSource cts)
            {
                m_ts = ts;
                m_bus = bus;
                m_cts = cts;
                Thread = new Thread(Start)
                {
                    IsBackground = true,
                    Name = name
                };
                Thread.Start();
            }

            private void Start()
            {
                try
                {
                    foreach (var task in m_bus.GetConsumingEnumerable(m_cts.Token))
                    {
                        m_ts.RunInline(task);
                    }
                }
                catch (OperationCanceledException)
                {
                }
                m_ts.DecrementIdleThreadCount();
            }
        }

        private readonly InternalTaskScheduler m_ts;
        private readonly CancellationTokenSource m_cts = new CancellationTokenSource();
        private readonly BlockingCollection<Task> m_bus = new BlockingCollection<Task>();
        private readonly List<ThreadContext> m_threadCtxs = new List<ThreadContext>();

        public BJEThreadPool(int threadCount)
        {
            m_ts = new InternalTaskScheduler(threadCount, m_bus);
            for (int i = 0; i < threadCount; ++i)
            {
                m_threadCtxs.Add(new ThreadContext("BJE Thread " + i, m_ts, m_bus, m_cts));
            }
        }

        public void Terminate()
        {
            m_cts.Cancel();
            foreach (var t in m_threadCtxs)
            {
                t.Thread.Join();
            }
        }

        public Task Run(Action<CancellationToken> action)
        {
            return Task.Factory.StartNew(() => action(m_cts.Token), m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts);
        }
        public Task Run(Action action)
        {
            return Task.Factory.StartNew(action, m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts);
        }

        public int IdleThreadCount
        {
            get { return m_ts.IdleThreadCount; }
        }
    }

    class Program
    {
        static void Main()
        {
            const int THREAD_COUNT = 32;
            var pool = new BJEThreadPool(THREAD_COUNT);
            var tcs = new TaskCompletionSource<bool>();
            var tasks = new List<Task>();
            var allRunning = new CountdownEvent(THREAD_COUNT);

            for (int i = pool.IdleThreadCount; i > 0; --i)
            {
                var index = i;
                tasks.Add(pool.Run(cancellationToken =>
                {
                    Console.WriteLine("Started action " + index);
                    allRunning.Signal();
                    tcs.Task.Wait(cancellationToken);
                    Console.WriteLine("  Ended action " + index);
                }));
            }

            Console.WriteLine("pool.IdleThreadCount = " + pool.IdleThreadCount);

            allRunning.Wait();
            Debug.Assert(pool.IdleThreadCount == 0);

            int expectedIdleThreadCount = THREAD_COUNT;
            Console.WriteLine("Press [c]ancel, [e]rror, [a]bort or any other key");
            switch (Console.ReadKey().KeyChar)
            {
            case 'c':
                Console.WriteLine("Cancel All");
                tcs.TrySetCanceled();
                break;
            case 'e':
                Console.WriteLine("Error All");
                tcs.TrySetException(new Exception("Failed"));
                break;
            case 'a':
                Console.WriteLine("Abort All");
                pool.Terminate();
                expectedIdleThreadCount = 0;
                break;
            default:
                Console.WriteLine("Done All");
                tcs.TrySetResult(true);
                break;
            }
            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException exc)
            {
                Console.WriteLine(exc.Flatten().InnerException.Message);
            }

            Debug.Assert(pool.IdleThreadCount == expectedIdleThreadCount);

            pool.Terminate();
            Console.WriteLine("Press any key");
            Console.ReadKey();
        }
    }
}

It is a very simple implementation and it appears to be working. However, there is a problem - the BJEThreadPool.Run method does not accept asynchronous methods. I.e. my implementation does not allow me to add the following overloads:

public Task Run(Func<CancellationToken, Task> action)
{
    return Task.Factory.StartNew(() => action(m_cts.Token), m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts).Unwrap();
}
public Task Run(Func<Task> action)
{
    return Task.Factory.StartNew(action, m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts).Unwrap();
}

The pattern I use in InternalTaskScheduler.RunInline does not work in this case.

So, my question is how to add the support for asynchronous work items? I am fine with changing the entire design as long as the requirements outlined at the beginning of the post are upheld.

EDIT

I would like to clarify the intented usage of the desired pool. Please, observe the following code:

if (pool.IdleThreadCount == 0)
{
  return;
}

foreach (var jobData in FetchFromDB(pool.IdleThreadCount))
{
  pool.Run(CreateJobAction(jobData));
}

Notes:

  1. The code is going to be run periodically, say every 1 minute.
  2. The code is going to be run concurrently by multiple machines watching the same database.
  3. FetchFromDB is going to use the technique described in Using SQL Server as a DB queue with multiple clients to atomically fetch and lock the work from the DB.
  4. CreateJobAction is going to invoke the code denoted by jobData (the job code) and close the work upon the completion of that code. The job code is out of my control and it could be pretty much anything - heavy CPU bound code or light asynchronous IO bound code, badly written synchronous IO bound code or a mix of all. It could run for minutes and it could run for hours. Closing the work is my code and it would by asynchronous IO bound code. Because of this, the signature of the returned job action is that of an asynchronous method.

Item 2 underlines the importance of correctly identifying the amount of idle threads. If there are 900 pending work items and 10 agent machines I cannot allow an agent to fetch 300 work items and queue them on the thread pool. Why? Because, it is most unlikely that the agent will be able to run 300 work items concurrently. It will run some, sure enough, but others will be waiting in the thread pool work queue. Suppose it will run 100 and let 200 wait (even though 100 is probably far fetched). This wields 3 fully loaded agents and 7 idle ones. But only 300 work items out of 900 are actually being processed concurrently!!!

My goal is to maximize the spread of the work amongst the available agents. Ideally, I should evaluate the load of an agent and the "heaviness" of the pending work, but it is a formidable task and is reserved for the future versions. Right now, I wish to assign each agent the max job capacity with the intention to provide the means to increase/decrease it dynamically without restarting the agents.

Next observation. The work can take quite a long time to run and it could be all synchronous code. As far as I understand it is undesirable to utilize thread pool threads for such kind of work.

EDIT 2

There is a statement that TaskScheduler is only for the CPU bound work. But what if I do not know the nature of the work? I mean it is a general purpose Background Job Engine and it runs thousands of different kinds of jobs. I do not have means to tell "that job is CPU bound" and "that on is synchronous IO bound" and yet another one is asynchronous IO bound. I wish I could, but I cannot.

EDIT 3

At the end, I do not use the SemaphoreSlim, but neither do I use the TaskScheduler - it finally trickled down my thick skull that it is unappropriate and plain wrong, plus it makes the code overly complex.

Still, I failed to see how SemaphoreSlim is the way. The proposed pattern:

public async Task Enqueue(Func<Task> taskGenerator)
{
    await semaphore.WaitAsync();
    try
    {
        await taskGenerator();
    }
    finally
    {
        semaphore.Release();
    }
}

Expects taskGenerator either be an asynchronous IO bound code or open a new thread otherwise. However, I have no means to determine whether the work to be executed is one or another. Plus, as I have learned from SemaphoreSlim.WaitAsync continuation code if the semaphore is unlocked, the code following the WaitAsync() is going to run on the same thread, which is not very good for me.

Anyway, below is my implementation, in case anyone fancies. Unfortunately, I am yet to understand how to reduce the pool thread count dynamically, but this is a topic for another question.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace TaskStartLatency
{
    public interface IBJEThreadPool
    {
        void SetThreadCount(int threadCount);
        void Terminate();
        Task Run(Action action);
        Task Run(Action<CancellationToken> action);
        Task Run(Func<Task> action);
        Task Run(Func<CancellationToken, Task> action);
        int IdleThreadCount { get; }
    }

    public class BJEThreadPool : IBJEThreadPool
    {
        private interface IActionContext
        {
            Task Run(CancellationToken ct);
            TaskCompletionSource<object> TaskCompletionSource { get; }
        }

        private class ActionContext : IActionContext
        {
            private readonly Action m_action;

            public ActionContext(Action action)
            {
                m_action = action;
                TaskCompletionSource = new TaskCompletionSource<object>();
            }

            #region Implementation of IActionContext

            public Task Run(CancellationToken ct)
            {
                m_action();
                return null;
            }

            public TaskCompletionSource<object> TaskCompletionSource { get; private set; }

            #endregion
        }
        private class CancellableActionContext : IActionContext
        {
            private readonly Action<CancellationToken> m_action;

            public CancellableActionContext(Action<CancellationToken> action)
            {
                m_action = action;
                TaskCompletionSource = new TaskCompletionSource<object>();
            }

            #region Implementation of IActionContext

            public Task Run(CancellationToken ct)
            {
                m_action(ct);
                return null;
            }

            public TaskCompletionSource<object> TaskCompletionSource { get; private set; }

            #endregion
        }
        private class AsyncActionContext : IActionContext
        {
            private readonly Func<Task> m_action;

            public AsyncActionContext(Func<Task> action)
            {
                m_action = action;
                TaskCompletionSource = new TaskCompletionSource<object>();
            }

            #region Implementation of IActionContext

            public Task Run(CancellationToken ct)
            {
                return m_action();
            }

            public TaskCompletionSource<object> TaskCompletionSource { get; private set; }

            #endregion
        }
        private class AsyncCancellableActionContext : IActionContext
        {
            private readonly Func<CancellationToken, Task> m_action;

            public AsyncCancellableActionContext(Func<CancellationToken, Task> action)
            {
                m_action = action;
                TaskCompletionSource = new TaskCompletionSource<object>();
            }

            #region Implementation of IActionContext

            public Task Run(CancellationToken ct)
            {
                return m_action(ct);
            }

            public TaskCompletionSource<object> TaskCompletionSource { get; private set; }

            #endregion
        }

        private readonly CancellationTokenSource m_ctsTerminateAll = new CancellationTokenSource();
        private readonly BlockingCollection<IActionContext> m_bus = new BlockingCollection<IActionContext>();
        private readonly LinkedList<Thread> m_threads = new LinkedList<Thread>();
        private int m_idleThreadCount;

        private static int s_threadCount;

        public BJEThreadPool(int threadCount)
        {
            ReserveAdditionalThreads(threadCount);
        }

        private void ReserveAdditionalThreads(int n)
        {
            for (int i = 0; i < n; ++i)
            {
                var index = Interlocked.Increment(ref s_threadCount) - 1;

                var t = new Thread(Start)
                {
                    IsBackground = true,
                    Name = "BJE Thread " + index
                };
                Interlocked.Increment(ref m_idleThreadCount);
                t.Start();

                m_threads.AddLast(t);
            }
        }

        private void Start()
        {
            try
            {
                foreach (var actionContext in m_bus.GetConsumingEnumerable(m_ctsTerminateAll.Token))
                {
                    RunWork(actionContext).Wait();
                }
            }
            catch (OperationCanceledException)
            {
            }
            catch
            {
                // Should never happen - log the error
            }

            Interlocked.Decrement(ref m_idleThreadCount);
        }

        private async Task RunWork(IActionContext actionContext)
        {
            Interlocked.Decrement(ref m_idleThreadCount);
            try
            {
                var task = actionContext.Run(m_ctsTerminateAll.Token);
                if (task != null)
                {
                    await task;
                }
                actionContext.TaskCompletionSource.SetResult(null);
            }
            catch (OperationCanceledException)
            {
                actionContext.TaskCompletionSource.TrySetCanceled();
            }
            catch (Exception exc)
            {
                actionContext.TaskCompletionSource.TrySetException(exc);
            }
            Interlocked.Increment(ref m_idleThreadCount);
        }

        private Task PostWork(IActionContext actionContext)
        {
            m_bus.Add(actionContext);
            return actionContext.TaskCompletionSource.Task;
        }

        #region Implementation of IBJEThreadPool

        public void SetThreadCount(int threadCount)
        {
            if (threadCount > m_threads.Count)
            {
                ReserveAdditionalThreads(threadCount - m_threads.Count);
            }
            else if (threadCount < m_threads.Count)
            {
                throw new NotSupportedException();
            }
        }
        public void Terminate()
        {
            m_ctsTerminateAll.Cancel();
            foreach (var t in m_threads)
            {
                t.Join();
            }
        }

        public Task Run(Action action)
        {
            return PostWork(new ActionContext(action));
        }
        public Task Run(Action<CancellationToken> action)
        {
            return PostWork(new CancellableActionContext(action));
        }
        public Task Run(Func<Task> action)
        {
            return PostWork(new AsyncActionContext(action));
        }
        public Task Run(Func<CancellationToken, Task> action)
        {
            return PostWork(new AsyncCancellableActionContext(action));
        }

        public int IdleThreadCount
        {
            get { return m_idleThreadCount; }
        }

        #endregion
    }

    public static class Extensions
    {
        public static Task WithCancellation(this Task task, CancellationToken token)
        {
            return task.ContinueWith(t => t.GetAwaiter().GetResult(), token);
        }
    }

    class Program
    {
        static void Main()
        {
            const int THREAD_COUNT = 16;
            var pool = new BJEThreadPool(THREAD_COUNT);
            var tcs = new TaskCompletionSource<bool>();
            var tasks = new List<Task>();
            var allRunning = new CountdownEvent(THREAD_COUNT);

            for (int i = pool.IdleThreadCount; i > 0; --i)
            {
                var index = i;
                tasks.Add(pool.Run(async ct =>
                {
                    Console.WriteLine("Started action " + index);
                    allRunning.Signal();
                    await tcs.Task.WithCancellation(ct);
                    Console.WriteLine("  Ended action " + index);
                }));
            }

            Console.WriteLine("pool.IdleThreadCount = " + pool.IdleThreadCount);

            allRunning.Wait();
            Debug.Assert(pool.IdleThreadCount == 0);

            int expectedIdleThreadCount = THREAD_COUNT;
            Console.WriteLine("Press [c]ancel, [e]rror, [a]bort or any other key");
            switch (Console.ReadKey().KeyChar)
            {
            case 'c':
                Console.WriteLine("ancel All");
                tcs.TrySetCanceled();
                break;
            case 'e':
                Console.WriteLine("rror All");
                tcs.TrySetException(new Exception("Failed"));
                break;
            case 'a':
                Console.WriteLine("bort All");
                pool.Terminate();
                expectedIdleThreadCount = 0;
                break;
            default:
                Console.WriteLine("Done All");
                tcs.TrySetResult(true);
                break;
            }

            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException exc)
            {
                Console.WriteLine(exc.Flatten().InnerException.Message);
            }

            Debug.Assert(pool.IdleThreadCount == expectedIdleThreadCount);

            pool.Terminate();
            Console.WriteLine("Press any key");
            Console.ReadKey();
        }
    }
}
Community
  • 1
  • 1
mark
  • 59,016
  • 79
  • 296
  • 580
  • 2
    How would you summarize your question? like `The machine is not supposed to be running more than N work items concurrently` ? – L.B Sep 02 '14 at 21:33
  • I would say that the current title is pretty accurate - I have a custom thread pool which does not support async work items and the question is how to modify it in order to support them. – mark Sep 02 '14 at 21:36
  • Since many `async` operations don't actually use new threads to run at all your question is somewhat strange... – Alexei Levenkov Sep 02 '14 at 21:40
  • @AlexeiLevenkov - I am sorry, I did not understand your point. Please, elaborate. – mark Sep 02 '14 at 21:43
  • Common async operation is not CPU bound and have small synchronous portion at the beginning which triggers asynchronous portion(s) where wait is not using any threads (i.e. waiting for file IO completion notification) than comes back and use some thread again to finish processing. You've said that you don't care about where it waits/completes - so you trying to use very complicated code to run small synchronous part of each operation on new thread... Instead of something like `.WhenAll(...)`. – Alexei Levenkov Sep 02 '14 at 21:48
  • From your first comment I can see that your are not open to criticize your code but may I ask "why you need this code, what is your real problem you try to solve"? – L.B Sep 02 '14 at 22:00
  • @AlexeiLevenkov - Frankly I still do not see where I have said what you are saying I have said. The work item fetched from the database may be a heavy CPU bound work or it may be a poorly written synchronous IO bound code or it could be a superbly written asynchronous IO code. It could be running for a minute and it could be running for an hour or 10 hours. – mark Sep 02 '14 at 22:01
  • @AlexeiLevenkov - I am open to criticism very much. In fact I will be happy to see my code improved, you are welcome to post a formal reply, rather then a comment. This way I will be able to understand it better. – mark Sep 02 '14 at 22:03
  • @mark Seems like you reply to wrong people without answering the question in comment :) – L.B Sep 02 '14 at 22:06
  • @L.B - That happens. My real problem is that I want to use a database table as a work item queue given many concurrent producers and consumers. I know how to fetch work atomically - http://stackoverflow.com/questions/3641703/using-sql-server-as-a-db-queue-with-multiple-clients. The status of the work fetched in this way will be changed from `Pending` to `Running`. However, I do not want to "lie" about the work being in the running state - I want to ensure that no work fetched by me is going to wait for threads to become available. That is the major motivation for the requirements. – mark Sep 02 '14 at 22:15
  • @mark `I want to ensure that no work fetched by me is going to wait for threads to become available` I wish, I could understand what your were saying. Good luck. – L.B Sep 02 '14 at 22:21
  • @L.B - When a work is queued to the thread pool thread it is possible that no thread pool threads are available to run it at this precise moment. Or am I wrong? All I want is reserve a constant amount of threads which will always be available and be free to keep them or block them for indefinite amount of time, because this is what the work items might do - run for very long or block for very long. – mark Sep 02 '14 at 22:27
  • I was trying to point out that 1st requirement "work is free to use the standard .NET thread pool, if needed to spawn concurrent tasks" basically removes need for your custom code with async operations - you don't seem to care where/how operation finishes as long as it started quickly. With async operations start is fast (unless it is really not async at all and just pretends to be one) - so it feels you doing work for nothing. I could be completely misunderstanding your requirements, but that how I read them. – Alexei Levenkov Sep 02 '14 at 23:43
  • What about a solution where you start N agents and they are pulling work from the queue. That would give perfect load distribution. And I don't see the need to adjust the DOP but that could be done as well. Would that fit your scenario better? – usr Sep 03 '14 at 20:56
  • I do have N agents already and they are going to pull from the queue (table). But I do not want a situation where a work is pull by the agent X and remains in its thread pool work queue while the agent Y is under utilized. I need a way for an agent to decide that it should not pull more than N work items. Right now that N is defined solely by the idle thread count on the agent. In the future it could be defined by the agent load and the "heaviness" of the work items. But I do not have this at the moment. – mark Sep 03 '14 at 21:26
  • Closing question as too broad. The requirements have evolved to be too complicated for Stack Overflow. I think the basic ideas have been sufficiently explained here. – usr Sep 03 '14 at 21:43

2 Answers2

3

Asynchronous "work items" are often based on async IO. Async IO does not use threads while it runs. Task schedulers are used to execute CPU work (tasks based on a delegate). The concept TaskScheduler does not apply. You cannot use a custom TaskScheduler to influence what async code does.

Make your work items throttle themselves:

static SemaphoreSlim sem = new SemaphoreSlim(maxDegreeOfParallelism); //shared object

async Task MyWorkerFunction()
{
    await sem.WaitAsync();
    try
    {
        MyWork();
    }
    finally
    {
        sem.Release();
    }
}
Servy
  • 202,030
  • 26
  • 332
  • 449
usr
  • 168,620
  • 35
  • 240
  • 369
  • Note that there is no `WaitOneAsync`, it's just `WaitAsync`. It's also important that the release be in a `finally` so that you don't cause a deadlock by not releasing it when done. Hope you don't mind that I just made the edits. – Servy Sep 03 '14 at 17:47
  • I have certain requirements. The devil is in details - please explain how does your answer satisfy my requirements. Please note, that whatever is denoted by `MyWork` may be anything - synchronous CPU bound code, synchronous IO bound code, asynchronous IO bound code or a mix of all of them. Also, it could be a long running work (hours) or it could be a short running work (minutes). – mark Sep 03 '14 at 17:48
  • Please my EDIT to the post. I hope it clarifies my context a bit more. – mark Sep 03 '14 at 18:24
  • It could be that your way is the way to do it, I do not know yet. I have implied it all along, but now I have added it explicitly to the list of the requirements - dynamically adjusting the degree of parallelism (requirement No 4). How do I do it using your solution? Next, the work is executed in the continuation of WaitAsync - it is on the thread pool, isn't it? If so, it could be a long running process occupying a thread pool thread. Are we OK with that? – mark Sep 03 '14 at 18:43
2

As mentioned in another answer by usr you can't do this with a TaskScheduler as that is only for CPU bound work, not limiting the level of parallelization of all types of work, whether parallel or not. He also shows you how you can use a SemaphoreSlim to asynchronously limit the degrees of parallelism.

You can expand on this to generalize these concepts in a few ways. The one that seems like it would be the most beneficial to you would be to create a special type of queue that takes operations that return a Task and executes them in such a way that a given max degree of parallelization is achieved.

public class FixedParallelismQueue
{
    private SemaphoreSlim semaphore;
    public FixedParallelismQueue(int maxDegreesOfParallelism)
    {
        semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
            maxDegreesOfParallelism);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

This allows you to create a queue for your application (you can even have several separate queues if you want) that has a fixed degree of parallelization. You can then provide operations returning a Task when they complete and the queue will schedule it when it can and return a Task representing when that unit of work has finished.

Community
  • 1
  • 1
Servy
  • 202,030
  • 26
  • 332
  • 449
  • Please my EDIT to the post. I hope it clarifies my context a bit more. – mark Sep 03 '14 at 18:25
  • It could, I do not know yet. I have implied it all along, but now I have added it explicitly to the list of the requirements - dynamically adjusting the degree of parallelism (requirement No 4). How do I do it using your solution? Next, the work is executed in the continuation of `WaitAsync` - it is on the thread pool, isn't it? If so, it could be a long running process occupying a thread pool thread. Are we OK with that? – mark Sep 03 '14 at 18:41
  • @mark You're more than welcome to add methods to adjust the count of the semaphore without actually starting up units of work, thus changing the degrees of parallelism. None of the work done *in this method* is going to be long running. it's all just short scheduling work. If some actual unit of work represents long running CPU bound work that is going to run too long to run in a thread pool thread then the function should return a task that represents doing some work outside of a thread pool thread. It is the responsibility of that function to determine how the operation is performed. – Servy Sep 03 '14 at 18:45
  • 1. I do not know how to adjust the count of the SemaphoreSlim, not without replacing it. 2. Are you saying I should know the nature of the work? This would be perfect, but I cannot. You see, the work (or background jobs as we call them) are written by different teams. The skill level of people writing them is different. They have been written for the past 4 years using different .NET technologies. Old code is purely synchronous, new code contains asynchronous patches in it. Again, some is heavy and some is light. I wish I had weight on each job specifying how "heavy" it is. But I do not. – mark Sep 03 '14 at 18:57
  • @mark You wait or release the semaphore to change its count. I'm not saying you can tell what the work is doing, I'm saying *you don't have to know or care*. It is *irrelivant* how the `Task` is created. It is not the responsibility of this object. If someone has a very long running CPU bound operation to perform, it is their responsibility as a caller to create a function that creates a task that represents performing that work asynchronously in an appropriate manor, likely by creating a new thread. If they have asynchronous IO, they do that, etc. This queue doesn't care how they do it. – Servy Sep 03 '14 at 18:59
  • 1. Still do not get it. If the degree was 20 and then changed to 30 - how do I do it with the same SemaphoreSlim instance? 2. Are you saying the code of some background jobs has to be changed? – mark Sep 03 '14 at 19:11
  • @mark Release the semaphore 10 times. I'm saying that it's the responsibility of whatever function is passed to this queue to actually determine what the `Task` does, where it's executed, etc. All the queue does is limit the degrees of parallelization to a fixed amount. No more. – Servy Sep 03 '14 at 19:13
  • 1. I am slow, but I am getting there. So, I can easily increase the capacity. But how do I decrease it? 2. I believe I have already explained that I have no means currently to examine the work fetched and determine whether it is safe to be run on a thread pool thread or must have a dedicated thread spawned for it. That is the sorrow reality right now. – mark Sep 03 '14 at 19:26
  • I already told you how to adjust the capacity. If you don't know how the work should be scheduled then you'll have to make a call. You'll have to pick one option or the other. – Servy Sep 03 '14 at 19:31
  • Stupid me. SemaphoreSlim.Wait does not block if it has enough capacity. Got it. – mark Sep 03 '14 at 21:54