9

I have a Parallel.ForEach code in my Windows Service. If ParallelOptions.MaxDegreeOfParallelism is set to -1 I'm using the most of my CPU's. However stopping the service lasts for half a minute. Some internal controller thread that should receive the signal that the service should be stopped is starved out of processor time. I set the process priority to below normal, but that could be irrelevant info here.

What can I do to shorten the time of stopping the service even when all threads are busy?

I was toying with the idea to temporarily lower the priority of the threads from the thread pool, since I don't have any async code, but Internet says that's a bad idea, so asking here for a "proper" way.

The threads (both OS and .NET) are in all cases different between OnStart and OnStop. Also, if stopping is very prolonged then the OS thread in which OnStop will sometimes eventually be called is a new thread, not showing earlier in the log.

To build this code create new Windows service project, add ProjectInstaller class from designer, change Account to LocalService, and install once with InstallUtil. Make sure LocalService can write to C:\Temp.

public partial class Service1 : ServiceBase
{
    private ManualResetEvent stopEvent = new ManualResetEvent(false);
    private Task mainTask;
    private StreamWriter writer = File.AppendText(@"C:\Temp\Log.txt");

    public Service1()
    {
        InitializeComponent();

        writer.AutoFlush = true;
    }

    protected override void OnStart(string[] args)
    {
        Log("--------------");
        Log("OnStart");

        mainTask = Task.Run(new Action(Run));
    }

    protected override void OnStop()
    {
        Log("OnStop");
        stopEvent.Set();

        mainTask.Wait();
        Log("--------------");
    }

    private void Log(string line)
    {
        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
    }

    private void Run()
    {
        try
        {
            using (var sha = SHA256.Create())
            {
                var parallelOptions = new ParallelOptions();
                parallelOptions.MaxDegreeOfParallelism = -1;

                Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                    parallelOptions, (fileName, parallelLoopState) =>
                {
                    if (stopEvent.WaitOne(0))
                    {
                        Log("Stop requested");
                        parallelLoopState.Stop();
                        return;
                    }

                    try
                    {
                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
                    }
                    catch (Exception ex)
                    {
                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
                    }
                });
            }
        }
        catch (Exception ex)
        {
            Log(String.Format("exception={0}", ex.Message));
        }
    }
}
Dialecticus
  • 16,400
  • 7
  • 43
  • 103
  • have a look here https://stackoverflow.com/q/41851446/559144 – Davide Piras Aug 15 '18 at 13:54
  • 1
    You might be able to put it in a task and pass a cancellation token. – paparazzo Aug 15 '18 at 14:16
  • The stop signal shouldn't be "starved out". Have you confirmed that it's a problem of it not even reaching your `OnStop` code or could it be a problem relating to the code within? – Damien_The_Unbeliever Aug 15 '18 at 14:17
  • If you have good reason to believe that the control thread is being starved, why not play with **its** priority instead? Hint: It's the thread that called `ServiceBase.Run`, which doesn't return until all services are shut down. You can run arbitrary code *before* `Run`. – Damien_The_Unbeliever Aug 15 '18 at 14:33
  • I'm writing a log message when `OnStop` is called. My problem is that `OnStop` is not called quick enough. I have no way of knowing in which thread `OnStop` will be called. It's a thread different from `OnStart`. I assume that new `ProcessThread` is created and an existing `Thread` is reused from thread pool. – Dialecticus Aug 15 '18 at 16:28
  • Unless things have changed around significantly, the thread that calls `ServiceBase.Run` is used to dispatch control events. It's similar to `Application.Run` in forms applications - it's *that thread* that becomes the UI (or here, control) thread. How have you established that the threads are different between start/stop - they shouldn't be (as above). – Damien_The_Unbeliever Aug 15 '18 at 16:35
  • While I'm pretty sure it's a different `Thread` I haven't checked if it's a different `ProcessThread`. I'll return tomorrow with more details. – Dialecticus Aug 15 '18 at 16:44
  • Also, if your tasks are extensively logging, don't dismiss the idea that it's *contention on the log* that's slowing things down. – Damien_The_Unbeliever Aug 15 '18 at 19:19
  • @Damien I updated the question. Thread in which `Run` is running is not involved in any other operation. `OnStart` and `OnStop` are executed in other threads. Also, raising priority of the `Run` thread does not make the stopping more responsive. – Dialecticus Aug 16 '18 at 11:08
  • 2
    Any sort of small reproducible example would help. – TheSoftwareJedi Sep 06 '18 at 15:45
  • @TheSoftwareJedi added the example. – Dialecticus Sep 09 '18 at 11:12
  • StreamWriter is not thread safe though, so maybe declare your writer as `private TextWriter writer = TextWriter.Synchronized(File.AppendText(@"C:\Temp\Log.txt"));` – Ulf Kristiansen Sep 10 '18 at 12:07
  • @Dialecticus I'll take a look tonight from home. I can't run this at the office - locked down PCs. – TheSoftwareJedi Sep 10 '18 at 20:06
  • @Dialecticus there are several problems here. 1) Parallel.ForEach is meant for *data parallelism*. It uses the *current* thread as well, which means it appears to block. 2) Running *more* cores than are available with `-1` will result in *thrashing*, not improved CPU usage. If you have 8 cores you can't calculate more than 8 hashes concurrently. `Parallel.Foreach` will use all cores by default – Panagiotis Kanavos Sep 11 '18 at 14:52
  • @Dialecticus 3) If you want to cancel, use a CTS. Don't try to emulate it with an event. Right now, your code will keep starting tasks even if the event is set because `Parallel.Foreach` isn't cancelled. The expensive hashing, ordering operations *and* the hashing due to DOP=-1 mean that it could take a lot of time for one of those tasks to finally check the event. They may *never* check it though. If you start, eg 100 tasks concurrently, all of them will check the event and proceed. If you later try to cancel, *none* of them will have a chance to check the event – Panagiotis Kanavos Sep 11 '18 at 14:54
  • @PanagiotisKanavos thanks for the tips, but they don't really relate to my problem, apart from implied tip to reduce the number of concurrent tasks. I agree that CTS could bring a bit of improved performance in the end, but it's not noticeable in this case. Thanks for not giving these tips in the form of an answer :) – Dialecticus Sep 11 '18 at 17:17
  • @Dialecticus they do. The entire program is problematic because it uses the wrong concepts for unsuitable scenarios. The CTS isn't about performance, it's about *correctness* and ensuring that the program actually cancels. I do have to process thousands of files and records. I use an ActionBlock with a *small* DOP >1 and post filenames to it. This ensures that the CPU won't waste cycles switching among threads. It also allows me to gracefully cancel and only have to wait for the in-flight operations to finish – Panagiotis Kanavos Sep 12 '18 at 07:06
  • @Dialecticus think about what your code does. It doesn't check the event at all, so *all* operations get to finish. *That's* why you need to wait for half a minute. You'd have to check the event or CTS at various steps to actually know when to cancel. You can't do that though, because the only expensive operations, reading and hashing are tied in the same line. You'd have to separate them and check the event or CTS after reading, then hash. You could also cancel *reading*. `ReadAllBytesAsync` has an overload that accepts a CTS. – Panagiotis Kanavos Sep 12 '18 at 07:11
  • @PanagiotisKanavos I suggest you change the code to try to fix my problem. If you fix my problem you get 100 points. My problem is that OnStop is called very very late, because something inside is starved out of CPU. Run the code, see for yourself. – Dialecticus Sep 12 '18 at 08:06

5 Answers5

4

This code will stop the service within a second or two while the threads that are already computing will end only after they are done with their actual work. OnStop method receives the signal right away as you can see in Services. But, the TaskManager shows that the process associated with the service will stop only after the consuming threads all finish.

This uses a BlockingCollection of strings (paths) that a separate thread is filling. And there are a number of threads with low priority that will consume the strings.

public partial class Service1 : ServiceBase
{
    private StreamWriter writer = File.AppendText(@"C:\temp\Log.txt");

    const int nbTreads = 30;
    BlockingCollection<string> dataItems;
    bool stopCompute = false;
    List<Thread> threads = new List<Thread>();
    Thread threadProd;
    private object aLock = new object();

    public Service1()
    {
        InitializeComponent();

        dataItems = new BlockingCollection<string>(nbTreads);

        writer.AutoFlush = true;
    }


    protected override void OnStart(string[] args)
    {
        Log("--------------");
        Log("OnStart");
        threadProd = new Thread(new ThreadStart(ProduireNomFichier));
        threadProd.Start();
        Thread.Sleep(1000); // fill the collection a little
        for (int i = 0; i < nbTreads; i++)
        {
            Thread threadRun = new Thread(() => Run());
            threadRun.Priority = ThreadPriority.Lowest;
            threadRun.Start();
            threads.Add(threadRun);
        }
    }

    private void ProduireNomFichier()
    {
        foreach (string nomFichier in Directory.EnumerateFiles(Environment.SystemDirectory))
        {
            dataItems.Add(nomFichier);
        }
    }

    protected override void OnStop()
    {
        lock (aLock)
        {
            stopCompute = true;
        }
        Log("OnStop");
        Log("--------------");
        threadProd.Abort();
    }

    private void Log(string line)
    {
        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
    }

    private void Run()
    {
        try
        {
            using (var sha = SHA256.Create())
            {
                while (dataItems.TryTake(out string fileName))
                {
                    lock (aLock)
                    {
                        if (stopCompute) return;
                    }
                    try
                    {
                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
                    }
                    catch (Exception ex)
                    {
                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Log(String.Format("exception={0}", ex.Message));
        }
    }
}
SylF
  • 106
  • 5
  • In this solution we have responsive service, because created threads have lowest priority, but we have hardcoded number of threads, which is ineffective for most number of processor cores (if less cores then lots of context switching happens, and if more cores then cores are idle). Even if we somehow calculate perfect number of threads this number is invalid if some other app wants to do something CPU intensive. – Dialecticus Sep 12 '18 at 19:34
  • Task parallel library is invented so that perfect number of threads is used in all cases, for maximum efficiency. So `Task` is preferable to `Thread`, and `Parallel.ForEach` uses `Task` internally. We just have to figure out the case of shutting down the Windows service. – Dialecticus Sep 12 '18 at 19:34
  • The problem is to signal OnStop method as soon as the user wants to stop the service. The hardcoded number of threads I chose was only for my own tests. You may not find a nice solution with tasks as the main thread is drowned in all the threads spawned by the `Parallel.Foreach`. – SylF Sep 12 '18 at 19:44
  • +1 ... But : You can use: Environment.ProcessorCount to get processor count instead of hardcoding it. You don't need to use a lock around "stopCompute", it will add delay for no reason. Otherwise I really like your code. It is the pretty much the same solution as I proposed. – Eric Ouellet Sep 13 '18 at 20:57
3

In the Parallel.Foreach, you read all the bytes of a file, then you order them using LINQ. This is not efficient. Try using Array.Sort. That can be 85% faster for a 25 Mb file.

Array.Sort 2230 ms
OrderBy 14650 ms

And because OnStop method waits the end of any iteration that is already started, it could stops your service a lot faster.

var fileBinary = File.ReadAllBytes(fileName);
Array.Sort(fileBinary);
var hash = sha.ComputeHash(fileBinary);
SylF
  • 106
  • 5
  • I made this code to demonstrate the problem, which I described in the question. My actual code exhibits the same problem, but is efficient. Your answer does not solve my problem. My problem is that OnStop is not called after the next task finishes, but much much later. – Dialecticus Sep 10 '18 at 16:57
  • In your example, you don't pass parallelOptions to Parallel.Foreach. – SylF Sep 10 '18 at 17:37
  • Yes, thanks. That in effect made me realize I don't actually use cancellation token in real code, but plain old manual reset event. Changed the code, so you can try it out, and see the delay for yourself. The problem is still there. – Dialecticus Sep 10 '18 at 19:06
  • Did you check the number of threads your process have in the Task Manager? There may be so many that the OS wants to give a few time of processing to every one. Also, it looks like there is intensive processing inside the Parallel.Foreach running for a very long time. Can you put a few Thread.Yield to have them cooperate? – SylF Sep 11 '18 at 14:35
  • Please run the code, and implement your solution to the problem. if it works post it here, and receive the sweet points. – Dialecticus Sep 12 '18 at 08:55
  • I just run the code and made different tests. The only way to stop fastly the service in this situation is to limit the number of threads in the Parallel.ForEach (MaxDegreeOfParallelism) to the number of logical cores minus one. And the minus one is really important. – SylF Sep 12 '18 at 16:57
  • Finally someone ran the code. Thanks for that ;) Yes, we can tweak the number of concurrent tasks, and find the value that suits are best. So now the question remains, can we do better? – Dialecticus Sep 12 '18 at 17:59
  • Good point. It does not solve the problem but should really improve the overall time. + 1 – Eric Ouellet Sep 13 '18 at 21:00
2

This is a working code. It stop instantly. Please note that the main idea come from : SylF.

But I cannot give a clear explanation why it happens... Update (after your comment below): You found the reason and it pretty nicely explain why you had that behavior. Thanks! I'm really happy to know.

Although the job is done in low priority threads, you should not notice any additional delay on a machine where the CPU is mostly free of works.

Sorry I mixed up your code sample to achieve some tests. But the main idea is to change the scheduler (which seems to be not recommended). But it is the only way I found.

Code:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StackOverflowQuestionWindowsService1
{
    public partial class Service1 : ServiceBase
    {
        private ManualResetEvent stopEvent = new ManualResetEvent(false);
        private Task mainTask;
        private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt");     //TAKE CARE - I do not append anymore  ********
        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private int count = 0;

        public Service1()
        {
            InitializeComponent();

            writer.AutoFlush = true;
        }

        protected override void OnStart(string[] args)
        {
            Log("--------------");
            Log("OnStart");

            Task.Run(()=>Run());
        }

        protected override void OnStop()
        {
            Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);

            cancellationTokenSource.Cancel();
        }

        private void Log(string line)
        {
            writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
                DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
        }

        private void Run()
        {
            Stopwatch stopWatchTotal = new Stopwatch();
            stopWatchTotal.Start();

            try
            {
                using (var sha = SHA256.Create())
                {
                    var parallelOptions = new ParallelOptions();
                    parallelOptions.MaxDegreeOfParallelism = -1;
                    parallelOptions.CancellationToken = cancellationTokenSource.Token;
                    parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);

                    Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                        parallelOptions, (fileName, parallelLoopState) =>
                        {
                            // Thread.CurrentThread.Priority = ThreadPriority.Lowest;
                            Stopwatch stopWatch = new Stopwatch();
                            stopWatch.Start();

                            Interlocked.Increment(ref count);

                            if (parallelOptions.CancellationToken.IsCancellationRequested)
                            {
                                Log(String.Format($"{count}"));
                                return;
                            }

                            try
                            {
                                var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                                stopWatch.Stop();
                                Log(FormatTicks(stopWatch.ElapsedTicks));
                                Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));
                            }
                            catch (Exception ex)
                            {
                                Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));
                            }
                        });
                }
            }
            catch (Exception ex)
            {
                Log(String.Format("exception={0}", ex.Message));
            }

            stopWatchTotal.Stop();

            Log(FormatTicks(stopWatchTotal.ElapsedTicks));

            writer.Close();
            Process.GetCurrentProcess().Kill();
        }

        private string FormatTicks(long ticks)
        {
            return new TimeSpan(ticks).ToString();
        }
    }
}

Priority Scheduler: (Thanks to Roman Starkov at: StackOverflow which came from Bnaya Eshet at Microsoft)

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StackOverflowQuestionWindowsService1
{
    public class PriorityScheduler : TaskScheduler
    {
        public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
        public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
        public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);

        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
        private Thread[] _threads;
        private ThreadPriority _priority;
        private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);

        public PriorityScheduler(ThreadPriority priority)
        {
            _priority = priority;
        }

        public override int MaximumConcurrencyLevel
        {
            get { return _maximumConcurrencyLevel; }
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);

            if (_threads == null)
            {
                _threads = new Thread[_maximumConcurrencyLevel];
                for (int i = 0; i < _threads.Length; i++)
                {
                    int local = i;
                    _threads[i] = new Thread(() =>
                    {
                        foreach (Task t in _tasks.GetConsumingEnumerable())
                            base.TryExecuteTask(t);
                    });
                    _threads[i].Name = string.Format("PriorityScheduler: ", i);
                    _threads[i].Priority = _priority;
                    _threads[i].IsBackground = true;
                    _threads[i].Start();
                }
            }
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false; // we might not want to execute task that should schedule as high or low priority inline
        }
    }
}
Eric Ouellet
  • 10,996
  • 11
  • 84
  • 119
  • I think I know what is going on. Was looking at the .NET code in referencesource.microsoft.com. `Parallel.ForEach` code splits the input array into chunks and feeds these chunks to threads. When a new task arrives it has to wait for a thread to complete all its chunks. OnStop is one such task. The only way for this task to start immediately is not to use default task scheduler for all other tasks. So even `ThreadPriority.Normal` will work. All that matters is that separate task scheduler is used. – Dialecticus Sep 13 '18 at 21:36
  • If `TaskScheduler.Default` had a notion of [priority queues](https://github.com/ChadBurggraf/parallel-extensions-extras/blob/master/TaskSchedulers/QueuedTaskScheduler.cs) then it would be possible to queue your task at lower queue priority, so that internal system tasks can run immediately. – Dialecticus Sep 13 '18 at 21:42
  • You should be pretty right because the CPU is not fully used which makes the reason I give totally wrong (I will remove the reason). Thanks a lots for having taking the time to explain me. But if I had to do the job, I would definitively used low priority thread at least to let the machine breath for any other tasks and let the user interface more responsive. – Eric Ouellet Sep 13 '18 at 21:43
  • Yes it would be nice to have a Pool Dispatcher like the WPF UI Dispatcher (with priority).. it would have solve the problem elegantly! – Eric Ouellet Sep 13 '18 at 21:44
  • You can give the points to Sylv, he really deserve it. In fact he got the main idea before me. I just translate it into something else ... and your explanation is more satisfying than anything else! – Eric Ouellet Sep 13 '18 at 21:49
1

In your parallelOptions object there is a property for a cancellation token source. You would set that property to a new CancellationTokenSource. Then in your parallel loop, you would put a call to parallelOptions.CancellationToken.ThrowIfCancellationRequested(). This would cause your threads to be terminated.

For a detailed example please see: How to: Cancel a Parallel.For or ForEach Loop

Edit: If you would like your service to stop quicker then you probably need to also cancel the execution of the ComputeHash function. Once your thread is in that call, it cannot be cancelled. So the solution is to do block transformations using the TransformBlock method in a loop. During that loop, you would need to check your CancellationToken, or in your case the manual reset event object. If you need guidance on how to do that, then please look at this answer: stop hashing operation using filestream. They have done an excellent job showing how to use block transformations using the MD5 algorithm, but it is directly portable to SHA256 algorithm.

WellerEE
  • 338
  • 2
  • 5
  • Not using a cancellation token is not my problem. My problem is that OnStop is called too late. I suggest that you run my code, confirm the problem, then change the code any way you like to solve the problem. – Dialecticus Sep 10 '18 at 21:18
  • 1
    @Dialecticus that's precisely the problem. `OnStop` isn't called too late. The code is written in a way that simply can't cancel any operations. What `OnStop` does is stop nothing, just wait for everything to finish – Panagiotis Kanavos Sep 12 '18 at 07:12
1

The problem is that when you issue a stop command, your threads each need to see the stop command and then synchronize in order to actually stop. This means your stop will only be as fast as your slowest hash computation. What I would do if I were you, is to rewrite the hash computation part of your code so that it computes the hash iteratively instead of calling the built in function. This way, you can stop in the middle of a hash computation.

SHA256 has methods in it called TransformBlock and TransformFinalBlock to do this.

An example of some code I wrote for my own personal project is:

do
{
    if(SHOULD_STOP)
        STOP();
    oldBytesRead = bytesRead;
    oldBuffer = buffer;

    buffer = new byte[4096];
    bytesRead = stream.Read(buffer, 0, buffer.Length);

    totalBytesRead += bytesRead;

    if (bytesRead == 0)
    {
        hashAlgorithm.TransformFinalBlock(oldBuffer, 0, oldBytesRead);
    }
    else
    {
        hashAlgorithm.TransformBlock(oldBuffer, 0, oldBytesRead, oldBuffer, 0);
    }

    int progress = (int)((double)totalBytesRead * 100 / size);

} while (bytesRead != 0);

return BitConverter.ToString(hashAlgorithm.Hash).Replace("-", "").ToLowerInvariant();
Dylan
  • 1,335
  • 8
  • 21
  • `SHOULD_STOP` is not set fast enough. OnStop is called like a minute after I stopped the service. During that time many more hashes are calculated than there are threads in thread pool. I have no control of when OnStop will be called. I invite you to build given code, start the service, try to stop the service at certain time, and in log file find when OnStop was actually called. These time points will be 30 to 60 seconds apart. – Dialecticus Sep 12 '18 at 15:43