0

I have a method Create which is executed whenever a new message is seen on the service bus message queue (https://azure.microsoft.com/en-us/services/service-bus/).
I am trying to limit the total number of concurrent tasks that can run in parallel for all calls of Create to 5 tasks.
In my code Parallel.ForEach does not seem to do anything.

I have tried to add a mutex/lock around the makePdfAsync() invocation like this:

mutex.WaitOne();
if(curretNumTasks < MaxTasks)
{
    tasks.Add(makePdfAsync(form));
}
mutex.ReleaseMutex();

but it is extremely slow and makes the service bus throw.

How do I limit the number of concurrent tasks all invocations of Create creates?

public async Task Create(List<FormModel> forms)
 {
     var tasks = new List<Task>();

     Parallel.ForEach(forms, new ParallelOptions { MaxDegreeOfParallelism = 5 }, form =>
     {
         tasks.Add(makePdfAsync(form));
     });
     await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(TimeSpan.FromMinutes(10)));
 }

 public async Task makePdfAsync()
 {
     var message = new PdfMessageModel();
     message.forms = new List<FormModel>() { form };

     var retry = 10;
     var uri = new Uri("http://localhost.:8007");
     var json = JsonConvert.SerializeObject(message);

     using (var wc = new WebClient())
     {
         wc.Encoding = System.Text.Encoding.UTF8;

         // reconnect with delay in case process is not ready
         while (true)
         {
             try
             {
                 await wc.UploadStringTaskAsync(uri, json);
                 break;
             }
             catch
             {
                 if (retry-- == 0) throw;
             }
         }
     }
 }

TL;DR. Create is a method on a class, it is called on many instances simultaneously. The concurrency is two fold; Several invocations of Create simultaneously and within each invocation of Create several tasks run concurrently.
How do I limit the total number of tasks running at any one point?

Jeppe
  • 1,424
  • 2
  • 15
  • 36
  • You are just building the tasks list parallel, you're not executing it via the parallel.foreach.. all tasks are executed in `Task.WhenAll(tasks)` The `MaxDegreeOfParallelism = 5` doesn't have any effect on the `Task.WhenAll(tasks)`.. So the `Parallel.ForEach` has no use in this code. – Jeroen van Langen May 12 '17 at 14:38
  • This is the second time you've asked this question, and the answer is still the same: use `SemaphoreSlim`. If this isn't applicable to your scenario for some reason, then please update your question to clarify why this is the case. – Stephen Cleary May 12 '17 at 15:32

2 Answers2

0

You could look at using a system wide semaphore?

for example :

var throttle = new Semaphore(5,5,"pdftaskthrottle");

if (throttle.WaitOne(5000)){
    try{
       //do some task / thread stuff
       .....
    } catch(Exception ex){
       // handle
    } finally {
      //always remember to release the semaphore
      throttle.Release();
    }
} else {
   //we timed out ... try again?
}
Dai Bok
  • 3,451
  • 2
  • 53
  • 70
0

If I understand you correctly, you effectively want a producer/consumer queue with a limit of 5 tasks. BlockingCollection would be the best if that's what you're after. It has very good performance as internally it uses SemaphoreSlim to do the blocking when necessary. Also you can leverage Task together e.g. creating a BlockingCollection<Task<T>>. "C# in a nutshell" has a good section of this; see code below as a general example. Also try avoid using kernel-mode synchronisation construct like mutex if possible as they're slow (you have to pay for transiting from managed code into native code!).

        class PCQueue : IDisposable
        {
            private BlockingCollection<Task> _taskQueue = new BlockingCollection<Task>();
            public PCQueue(int workerCount)
            {
                for (int i = 0; i < workerCount; i++)
                    Task.Factory.StartNew(Consume);
            }

            public Task Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
            {
                //! A task object can either be generated using TaskCompletionSource or instantiated directly (an unstarted or cold task!).
                var task = new Task(action, cancelToken);
                _taskQueue.Add(task); //? Create a cold task and enqueue it.
                return task;
            }

            public Task<TResult> Enqueue<TResult>(Func<TResult> func, CancellationToken cancelToken = default(CancellationToken))
            {
                var task = new Task<TResult>(func, cancelToken);
                _taskQueue.Add(task);
                return task;
            }

            void Consume()
            {
                foreach (var task in _taskQueue.GetConsumingEnumerable())
                {
                    try
                    {
                        //! We run the task synchronously on the consumer's thread.
                        if (!task.IsCanceled) task.RunSynchronously();
                    }
                    catch (InvalidOperationException)
                    {
                        //! Handle the unlikely event that the task is canceled in between checking whether it's canceled and running it. 
                        // race condition!
                    }
                }
            }

            public void Dispose() => _taskQueue.CompleteAdding();
        }
dragonfly02
  • 3,403
  • 32
  • 55