0

I need to process a large number of files overnight, with a defined start and end time to avoid disrupting users. I've been investigating but there are so many ways of handling threading now that I'm not sure which way to go. The files come into an Exchange inbox as attachments.

My current attempt, based on some examples from here and a bit of experimentation, is:

 while (DateTime.Now < dtEndTime.Value)
 {
            var finished = new CountdownEvent(1);
            for (int i = 0; i < numThreads; i++)
            {


                object state = offset;

                finished.AddCount();
                ThreadPool.QueueUserWorkItem(delegate
                {
                    try
                    {
                        StartProcessing(state);
                    }
                    finally
                    {
                        finished.Signal();
                    }
                });

                offset += numberOfFilesPerPoll;

            }
            finished.Signal();
            finished.Wait(); 


        }

It's running in a winforms app at the moment for ease, but the core processing is in a dll so I can spawn the class I need from a windows service, from a console running under a scheduler, however is easiest. I do have a Windows Service set up with a Timer object that kicks off the processing at a time set in the config file.

So my question is - in the above code, I initialise a bunch of threads (currently 10), then wait for them all to process. My ideal would be a static number of threads, where as one finishes I fire off another, and then when I get to the end time I just wait for all threads to complete. The reason for this is that the files I'm processing are variable sizes - some might take seconds to process and some might take hours, so I don't want the whole application to wait while one thread completes if I can have it ticking along in the background. (edit)As it stands, each thread instantiates a class and passes it an offset. The class then gets the next x emails from the inbox, starting at the offset (using the Exchange Web Services paging functionality). As each file is processed, it's moved to a separate folder. From some of the replies so far, I'm wondering if actually I should grab the e-mails in the outer loop, and spawn threads as needed. To cloud the issue, I currently have a backlog of e-mails that I'm trying to process through. Once the backlog has been cleared, it's likely that the nightly run will have a significantly lower load.

On average there are around 1000 files to process each night.

Update

I've rewritten large chunks of my code so that I can use the Parallel.Foreach and I've come up against an issue with thread safety. The calling code now looks like this:

public bool StartProcessing()
        {

            FindItemsResults<Item> emails = GetEmails();



            var source = new CancellationTokenSource(TimeSpan.FromHours(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions { MaxDegreeOfParallelism = 8, CancellationToken = source.Token };

            try
            {
                Parallel.ForEach(emails, opts, processAttachment);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
            catch (Exception err)
            {
                WriteToLogFile(err.Message + "\r\n");
                WriteToLogFile(err.StackTrace + "r\n");
            }
            return true;
        }

So far so good (excuse temporary error handling). I have a new issue now with the fact that the properties of the "Item" object, which is an email, not being threadsafe. So for example when I start processing an e-mail, I move it to a "processing" folder so that another process can't grab it - but it turns out that several of the threads might be trying to process the same e-mail at a time. How do I guarantee that this doesn't happen? I know I need to add a lock, can I add this in the ForEach or should it be in the processAttachments method?

Emma
  • 19
  • 7
  • How about [using a thread pool](http://stackoverflow.com/questions/145304/when-to-use-thread-pool-in-c)? – Uwe Keim Jan 28 '16 at 09:57
  • What happens when you run out of time? Do you just stop processing files? How long does it take to process each file versus reading the file from disk? – Enigmativity Jan 28 '16 at 09:59
  • @Enigmativity - I'm actually processing attachments from e-mails in an inbox. Not sure what you mean by your comparison? The processing involves parsing the attachment content and passing to a service. The length of time this takes depends on the size and complexity of the file. There wasn't a significant gain in first saving the files to disk. In answer to the first question - When I run out of time, I want to stop spawning new threads, and just wait for the current ones to finish. – Emma Jan 28 '16 at 11:17
  • @EmmaFaulkner - The comparison is between the time it takes to load the files (IO doesn't improve from multithreading so much) versus the processing time (CPU is greatly improved by multithreading). – Enigmativity Jan 28 '16 at 11:22
  • @Enigmativity ah, I see. Yes, I'm multithreading to save time. The time taken to load the files isn't the problem in my case, though, it's the parsing. The e-mails trickle into the inbox overnight, so at each pass I might have a number of threads that don't do anything (or fewer threads than the maximum, more correctly, but conversely I might have one or two that take a long time to process. – Emma Jan 28 '16 at 11:29
  • @EmmaFaulkner - Got it. I think you need to show the code that gets the emails when they come in. – Enigmativity Jan 28 '16 at 11:34
  • The code that gets the e-mails is in a class instantiated in the StartProcessing() method. It goes and gets the next batch based on the offset, and exits if there are no matches. – Emma Jan 28 '16 at 12:05

4 Answers4

2

Use the TPL:

Parallel.ForEach( EnumerateFiles(),
                  new ParallelOptions { MaxDegreeOfParallelism = 10 },
                  file => ProcessFile( file ) );

Make EnumerateFiles stop enumerating when your end time is reached, trivially like this:

IEnumerable<string> EnumerateFiles()
{
    foreach (var file in Directory.EnumerateFiles( "*.txt" ))
        if (DateTime.Now < _endTime)
            yield return file;
        else
            yield break;
}
Haukinger
  • 10,420
  • 2
  • 15
  • 28
  • "Make `files` stop enumerating when your end time is reached"- how? This isn't a very useful answer unless you can explain that. – Enigmativity Jan 28 '16 at 11:35
1

You can use a combination of Parallel.ForEach() along with a cancellation token source which will cancel the operation after a set time:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(i => i.ToString());

            // For demo purposes, cancel after a few seconds.
            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8, CancellationToken = source .Token};

            try
            {
                Parallel.ForEach(fileList, opts, processFile);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

As an alternative to using a cancellation token, you can write a method that returns IEnumerable<string> which returns the list of filenames, and stop returning them when time is up, for example:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8};
            Parallel.ForEach(fileList(), opts, processFile);
        }

        static IEnumerable<string> fileList()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(x => x.ToString()).ToArray();

            // Simulate finishing after a few seconds.
            DateTime endTime = DateTime.Now + TimeSpan.FromSeconds(10);

            int i = 0;

            while (DateTime.Now <= endTime)
                yield return fileList[i++];
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

Note that you don't need the try/catch with this approach.

Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • Thanks, I didn't know about the cancellationtoken - it looks like that approach would work well with what I'm doing, although I might have to shuffle my code around a bit. – Emma Jan 28 '16 at 11:36
  • With the cancellationtoken example, would that stop all currently running threads, or allow them to complete and just stop the loop? – Emma Jan 28 '16 at 15:23
  • @EmmaFaulkner It prevents any new iterations from starting, but allows each thread to complete its processing of its current iteration before the exception is thrown. – Matthew Watson Jan 28 '16 at 15:31
  • absolutely perfect! Going to give this a try – Emma Jan 28 '16 at 16:45
  • @EmmaFaulkner Note that you will still need to `catch (OperationCanceledException)` because that exception will be thrown to indicate that the loop was cancelled. – Matthew Watson Jan 28 '16 at 18:24
0

You should consider using Microsoft's Reactive Framework. It lets you use LINQ queries to process multithreaded asynchronous processing in a very simple way.

Something like this:

var query =
    from file in filesToProcess.ToObservable()
    where DateTime.Now < stopTime
    from result in Observable.Start(() => StartProcessing(file))
    select new { file, result };

var subscription =
    query.Subscribe(x =>
    {
        /* handle result */
    });

Truly, that's all the code you need if StartProcessing is already defined.

Just NuGet "Rx-Main".

Oh, and to stop processing at any time just call subscription.Dispose().

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

This was a truly fascinating task, and it took me a while to get the code to a level that I was happy with it.

I ended up with a combination of the above.

The first thing worth noting is that I added the following lines to my web service call, as the operation timeout I was experiencing, and which I thought was because I'd exceeded some limit set on the endpoint, was actually due to a limit set by microsoft way back in .Net 2.0:

ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.Expect100Continue = false;

See here for more information:

What to set ServicePointManager.DefaultConnectionLimit to

As soon as I added those lines of code, my processing increased from 10/minute to around 100/minute.

But I still wasn't happy with the looping, and partitioning etc. My service moved onto a physical server to minimise CPU contention, and I wanted to allow the operating system to dictate how fast it ran, rather than my code throttling it.

After some research, this is what I ended up with - arguably not the most elegant code I've written, but it's extremely fast and reliable.

List<XElement> elements = new List<XElement>();
 while (XMLDoc.ReadToFollowing("ElementName"))
     {
   using (XmlReader r = XMLDoc.ReadSubtree())
      {
   r.Read();
   XElement node = XElement.Load(r);
//do some processing of the node here...
elements.Add(node);
}
}
//And now pass the list of elements through PLinQ to the actual web service call, allowing the OS/framework to handle the parallelism

int failCount=0; //the method call below sets this per request; we log and continue

failCount = elements.AsParallel()
                            .Sum(element => IntegrationClass.DoRequest(element.ToString()));

It ended up fiendishly simple and lightning fast.

I hope this helps someone else trying to do the same thing!

Community
  • 1
  • 1
Emma
  • 19
  • 7