9

I am currently optimizing an existing, very slow and timing out production application. There is no option to re-write it.

In short, it is a WCF service that currently calls 4 other "worker" WCF services sequentially. None of the worker services are dependent on results from the other. So we would like it to call them all at once (not sequentially). I will reiterate that we don't have the luxury of re-writing it.

enter image description here

The optimization involves making it call all worker services at once. This is where asynchrony came to mind.

I have limited experience with asynchronous programming, but I have read as widely as I can on the topic, with respect to my solution.

The problem is, on testing, it works but maxes out my CPU. I would appreciate your help

The following is a simplified version of the essential code in main WCF Service

// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        //DoWorkAsync is the worker method with the following signature:
        // Task<bool> DoWorkAsync()

        var workerTask = workerService.DoWorkAsync()
        workerTasks.Add(workerTask);
    }

    var task = Task.Run(async ()=>
    {
        await RunWorkerTasks(workerTasks);
    });
    task.Wait();


}

private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
    using(var semaphore = new SemaphoreSlim(initialCount:3))
    {

        foreach (var workerTask in workerTasks)
        {
            await semaphore.WaitAsync();
            try
            {
                await workerTask;
            }
            catch (System.Exception)
            {
                //assume 'Log' is a predefined logging service
                Log.Error(ex);
            }
        }
    }
} 

What I have read:

Multiple ways how to limit parallel tasks processing

How to limit the amount of concurrent async I/O operations?

Approaches for throttling asynchronous methods in C#

Constraining Concurrent Threads in C#

Limiting Number of Concurrent Threads With SemaphoresSlim

Async WCF call with ChannelFactory and CreateChannel

tinonetic
  • 7,751
  • 11
  • 54
  • 79
  • 1
    Unless I'm missing something, after populating your `workerTasks` list, you can just call `await Task.WhenAll(workerTasks)` and remove this whole `RunWorkerTasks` part – Kevin Gosse Aug 20 '19 at 11:46
  • 4
    I suspect you need to wait for the semaphore before **starting** the task, not **after**. `Parallel.ForEach` (with `MaxDegreesOfParallelism`) may also be worth considering. – mjwills Aug 20 '19 at 11:47
  • Possible duplicate of [Using async/await for multiple tasks](https://stackoverflow.com/questions/12337671/using-async-await-for-multiple-tasks) – Ackdari Aug 20 '19 at 12:10
  • @KevinGosse, so are you saying the code segment `var task = Task.Run(async ()=>` will now have (inside it) the entire `using(var semaphore = new SemaphoreSlim(initialCount:3))` code block?....And what about the performance? – tinonetic Aug 20 '19 at 12:43
  • @Ackdari You're wrong as I specifically asked why it is "maxing" out the CPU. Your suggestion, though related does not answer the main question. – tinonetic Aug 20 '19 at 12:50
  • @mjwills I had read on `Parallel.ForEach` and it seemed to be heavily discouraged. I see now that it may be the solution I need. Especially since I will not need to use `SemaphoreSlim`. Please put it as an answer and I will upvote it and possibly consider it as the answer after testing – tinonetic Aug 20 '19 at 13:02
  • The variable name `workerTask` confuses me. Is this a "real" task that does some processing, or a "meta" task that is created with the intention of managing the instantiation and execution of "real" tasks? I am asking because I personally use the term `workerTask` with the second meaning ([example](https://stackoverflow.com/a/56862796/11178549)). – Theodor Zoulias Aug 20 '19 at 19:59
  • @TheodorZoulias, I have updated the code snippet with a comment – tinonetic Aug 21 '19 at 08:50
  • 1
    Maxing out CPU maybe good news, if the whole completion time is reduced accordingly. The more you run in parallel, the more CPU you use. – Simon Mourier Aug 22 '19 at 18:05
  • 2
    No info on why it shouldn't maximize CPU. No info on kind of workers' work: IO/CPU bound. Still expecting a useful answer. Downvote. – cassandrad Aug 22 '19 at 20:50
  • You don't mention how many cores on the cpu. (Task.Wait is "dangerous".) If there is only 1 core, then your main thread is the only thread to do the work; and it must then be forcibly switched by the OS, or nothing would ever run. As @KevinGosse said, `RunWorkerTasks` is superfluous: you are just awaiting 1 semaphore 3 times on the same thread. Your whole work can be reduced to (using `System.Linq`) `Task.WhenAll(_workerServices.Select(workerService => workerService.DoWorkAsync()))`. Then await or Wait that Task ... If awaited then it's also safe. You create a few extra Tasks as-is. – Steven Coco Aug 24 '19 at 01:31
  • On WCF each request has a managed thread, in that context there is not a “main thread” – Claudio Aug 25 '19 at 09:22
  • If I correctly understand the question, the need is to have 3 slot for workers in the whole application, and each request call a bunch of workers. In that case semaphore, signal, await is overkilling if you are not familiar. You can make your own logic on TaskSheduler and use a higher api. Details in the answer below https://stackoverflow.com/a/57617735/3120219 – Claudio Aug 25 '19 at 09:32
  • Unless I miss something - your sample code runs ALL workers in parallel. By the time of calling 'workerService.DoWorkAsync()' the worker starts off it's job. 'RunWorkerTasks' only await's the worker Task. 'DoWorkAsync()' kicks of the async operation while 'await' pauses the calling method from execution until the awaited Task completes. – sa.he Aug 28 '19 at 05:44

3 Answers3

9

You didn't explain how you wanted to limit the concurrent calls. Do you want 30 concurrent worker tasks running, or do you want 30 WCF calls, each of which have all their worker tasks running concurrently, or do you want concurrent WCF calls to each have their own limit of concurrent worker tasks? Given you said that each WCF call has only 4 worker tasks and looking at your sample code, I assume you want a global limit of 30 concurrent worker tasks.

Firstly, as @mjwills implied, you need to use the SemaphoreSlim to limit calls to workerService.DoWorkAsync(). Your code currently starts all of them, and only tried to throttle how many you'll wait to finish. I assume this is why you max out CPU. The number of worker tasks started remains unbounded. Note however you'll also need to await the worker task while you hold the semaphore, otherwise you'll only throttle how fast you create tasks, not how many run concurrently.

Secondly, you're creating a new SemaphoreSlim for each WCF request. Hence my question from my first paragraph. The only way this will throttle anything is if you have more worker services than the initial count, which in your sample is 30, but you said there are only 4 workers. To have a "global" limit, you need to use a singleton SemaphoreSlim.

Thridly, you never call .Release() on the SemaphoreSlim, so if you did make it a singleton, your code will hang once it's started 30 workers since the process started. Make sure to do it in a try-finally block, so that if the worker crashes, it still gets released.

Here's some hastily written sample code:

public async Task ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        var workerTask = RunWorker(workerService);
        workerTasks.Add(workerTask);
    }

    await Task.WhenAll(workerTasks);
}

private async Task<bool> RunWorker(Func<bool> workerService)
{
    // use singleton semaphore.
    await _semaphore.WaitAsync();
    try
    {
        return await workerService.DoWorkAsync();
    }
    catch (System.Exception)
    {
        //assume error is a predefined logging service
        Log.Error(ex);
        return false; // ??
    }
    finally
    {
        _semaphore.Release();
    }
}
zivkan
  • 12,793
  • 2
  • 34
  • 51
  • I have edited, that was a typo. It was supposed to be 3 threads. Sorry about that – tinonetic Aug 20 '19 at 12:56
  • 1
    @user919426 so do you want each call to your WCF service to run 3 workers at a time, so 5 concurrent WCF calls means 15 workers? or 3 workers running regardless of how many concurrent WCF calls? In any case it only affects my second point. The first and third points remain. – zivkan Aug 20 '19 at 12:59
  • I would like a maximum of 3 workers being called concurrently. – tinonetic Aug 20 '19 at 13:00
  • 1
    In that case, my original answer is still my best attempt at answering your question. I was too lazy to change the 30's to 3's in my answer, but I did add some sample code to show how I believe you need to do it. – zivkan Aug 20 '19 at 13:17
  • Thanks for the update, but I have one issue with the change of the method signature `public async Task ProcessAllPendingWork()`. I cannot change it as it is a synchronous service operation that has existing code calling it. – tinonetic Aug 20 '19 at 13:25
  • 1
    It's necessary to do async properly, although async is designed to solve waiting on IO, not for doing parallel CPU-bound work. You can change `Task.WhenAll` to `Task.WaitAll`, which is a blocking API and therefore not change the signature. – zivkan Aug 20 '19 at 13:30
  • Quick question, using your method, at which point do you initialise the semaphore (In the constructor?)... also, is there any specific reason why it must be singleton? – tinonetic Aug 29 '19 at 10:35
  • If you create one semaphore per request, then you can only limit the number of workers per request. If you have multiple parallel requests, you will exceed 3 workers per web server. There are many ways to initialise it, which one is best depends on your application architecture and how/if you test your code. An easy way is just to define a static field in the class, which lets the .NET runtime initialise it the first time the class is used. Initialising on application startup is another good choice. – zivkan Aug 29 '19 at 12:40
4

The Task abstraction provided by TPL (Task parallel library) is an abstraction of Thread; tasks are enqueued in a thread pool and then executed when an execututor can manage that request.

In other word, depending on some factors (your traffic, CPU vs IO buound and deploy model) trying to execute a managed Task in your worker function may cause no benefit at all (or in some cases be slower).

Saying that, I suggest you to use Task.WaitAll (available from .NET 4.0) that uses very hight level abstractions to manage concurrency; in particular this piece of code could be useful for you:

  • it create workers and wait for all
  • it takes 10 seconds to execute (the longest Worker)
  • it catch and give to you the opportunity to manage exceptions
  • [last but not least] is a declerative api that focus your attention on what to do and not how to do.
public class Q57572902
{
    public void ProcessAllPendingWork()
    {
        var workers = new Action[] {Worker1, Worker2, Worker3};

        try
        {
            Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
            // ok
        }
        catch (AggregateException exceptions)
        {
            foreach (var ex in exceptions.InnerExceptions)
            {
                Log.Error(ex);
            }
            // ko
        }
    }

    public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something

    public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something

    public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong

}

I have seen from comments that you requires a maximum of 3 worker running in the same time; in this case you can simply copy-paste a LimitedConcurrencyLevelTaskScheduler from TaskScheduler documentation.

After that you have to create sigleton instance TaskScheduler with its onw TaskFactory like that:

public static class WorkerScheduler
{
    public static readonly TaskFactory Factory;

    static WorkerScheduler()
    {
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
        Factory = new TaskFactory(scheduler);
    }
}

Previous ProcessAllPendingWork() code remains the same except for

...workers.Select(Task.Factory.StartNew)...

that becomes

...workers.Select(WorkerScheduler.Factory.StartNew)...

because you have to use the TaskFactory associated to your custom WorkerScheduler.

If your worker needs to return some data to response, errors and data needs to be managed in a different manner as follows:

public void ProcessAllPendingWork()
{
    var workers = new Func<bool>[] {Worker1, Worker2, Worker3};
    var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();

    bool[] results = null;

    Task
        .WhenAll(tasks)
        .ContinueWith(x =>
        {
            if (x.Status == TaskStatus.Faulted)
            {
                foreach (var exception in x.Exception.InnerExceptions)
                    Log(exception);

                return;
            }

            results = x.Result; // save data in outer scope
        })
        .Wait();

    // continue execution
    // results is now filled: if results is null, some errors occured
}
Claudio
  • 3,060
  • 10
  • 17
  • Thank-you for your answer, but this is a far departure from the existing code, which I have no luxury of changing. – tinonetic Aug 28 '19 at 12:35
  • If you have sequentialy executed workers this solution seems the nearest one to the snippet that you provide. I avoided semaphore, async/await to be near your existing code. In witch way this solution seems far is far for you ? – Claudio Aug 28 '19 at 13:05
3

Unless I miss something - your sample code runs ALL workers in parallel. By the time of calling 'workerService.DoWorkAsync()' the worker starts off it's job. 'RunWorkerTasks' only waits for the worker Task to complete. 'DoWorkAsync()' kicks off the async operation while 'await' pauses the calling method from execution until the awaited Task completes.

The fact of high CPU usage is most likely due to your workerService's activity and not due to the way you call them. In order to verify that, try replacing workerService.DoWorkAsync() with Thread.Sleep(..) or Task.Delay(..). If your CPU usage drops, it is the workers to blame. (Depending on what workerService does) it might be ok or even expected that the CPU consumption increases once you run them in parallel.

Comming to your question of how to limit parallel execution. Note, that the following sample does not exactly use 3 threads, but at maximum 3 threads.

    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => workerService.DoWorkAsync()
            .ContinueWith(res => 
            {
                // Handle your result or possible exceptions by consulting res.
            })
            .Wait());

As you mentioned that previously your code was executing sequentially, I assume that the workers also have a non-async equivalent. It is probably easier to use those. For calling an async method synchronously is mostly a hassle. I've even had deadlock scenarios just by calling DoWorkAsync().Wait(). There has been much discussion of How would I run an async Task<T> method synchronously?. In essence I try to avoid it. If that is not possible, I attempt to use ContinueWith which increases the complexity, or AsyncHelper of the previous SO-discussion.

    var results = new ConcurrentDictionary<WorkerService, bool>();
    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => 
            {
                // Handle possible exceptions via try-catch.
                results.TryAdd(workerService, workerService.DoWork());
            });
    // evaluate results

Parallel.ForEach takes advantage of a Thread- or TaskPool. Meaning it dispatches every execution of the given parameter Action<TSource> body onto a dedicated thread. You can easily verify that with the following code. If Parallel.ForEach already dispatches the work on different Threads you can simply execute your 'expensive' operation synchronously. Any async operations would be unnecessary or even have bad impact on runtime performance.

    Parallel.ForEach(
        Enumerable.Range(1, 4),
        m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

This is the demo project I used for testing which does not rely on your workerService.

    private static bool DoWork()
    {
        Thread.Sleep(5000);
        Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
        return DateTime.Now.Millisecond % 2 == 0;
    }

    private static Task<bool> DoWorkAsync() => Task.Run(DoWork);

    private static void Main(string[] args)
    {
        var sw = new Stopwatch();
        sw.Start();

        // define a thread-safe dict to store the results of the async operation
        var results = new ConcurrentDictionary<int, bool>();

        Parallel.ForEach(
            Enumerable.Range(1, 4), // this replaces the list of workers
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            // m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
            m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());

        sw.Stop();

        // print results
        foreach (var item in results)
        {
            Console.WriteLine($"{item.Key}={item.Value}");
        }

        Console.WriteLine(sw.Elapsed.ToString());
        Console.ReadLine();
    }
sa.he
  • 1,391
  • 12
  • 25
  • Thank-you for your detailed answer, but what do you mean by `calling an async method synchronously is mostly a hassle.`? Why should I use `Parallel.ForEach` on synchronous methods vs an implementation close to mine? What is the benefit or disadvantage over the other?....In your demo project, I assume `ConcurrentDicttionary` is a thread safe dictionary to collect the results. Please include comments on that if I am right OR WRONG :)... I also assume that `Enumerable.Range(1, 4)` is simulates my collection of worker services. Is that correct? – tinonetic Aug 29 '19 at 05:17
  • Another question concerning your `Enumerable.Range(1,4)`, which I have assumed simulates my WorkerService collection. Is it correct to say that for my scenario & question, the code will now look as follows `Parallel.ForEach(_workerServices, new ParallelOptions { MaxDegreeOfParallelism = 3 },s => s.DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait()); ` ?? ... I will otherwise try your demo and give feedback. – tinonetic Aug 29 '19 at 05:24
  • I tried to address your comments by updating the question. Whether this or @zivkan approach is better is personal taste. I belief that Parallel.ForEach decides how many threads to use based on the given system-hardware and how busy the system is. MaxDegreeOfParallelism just sets an upper limit to that descision. – sa.he Aug 29 '19 at 06:34
  • Hi. Your answer is closest to the solution we are going with. We have decided to go with this approach. This does not degrade the quality of discussions and answers from every one else. Thank-you! – tinonetic Aug 30 '19 at 11:13
  • Maybe I misunderstood the question: the goal to achieve is to have max 3 active worker per http request ? or max 3 active worker for the entire host? This is a good solution in the first case, not for the second one – Claudio Aug 31 '19 at 15:53