6

i'm working off a queue with filenames. Each file has to be processed by a external binary. This works fine, but it only processes one file at a time. Is it possible two spawn a number of processes parallel?

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

while (queue.Count > 0)
{
    string file = queue.Dequeue();

    Process p = new Process();    
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
}

Update: I like the solution from Alex LE (Spawn processes, but only 5 at a time), but is it possible to wait for the child processes to exit as suggested by Ben Voigt?

Edit 1: i need to check for p.ExitCode == 0 to make some database updates.

Makabra
  • 61
  • 2
  • I'm leaving this as a comment because I can't give you specific code. Yes, you can do this. You'll want to use a thread for each parallel process with a callback to let you know when it's done. When you create the new thread you increment a variable and when the thread returns do decrement that variable so you can control how many threads you have running. And finally you'll then need a wait loop let's other processes run. The easiest way to do this is Application.DoEvents(). You might also try sleep but I can't remember if that will sleep child threads too. Oh, and of course don't WaitforExit –  Dec 05 '10 at 19:43
  • I found this example of using the threadpool with a quick google search: http://dotnetperls.com/threadpool –  Dec 05 '10 at 19:46
  • 1
    This is another example of something that is really straightforward using the Win32 API, but .NET doesn't give you access to the right things. If you could get the `ProcessWaitHandle` that `Process.WaitForExit` uses, you'd put five handles in an array and then call `WaitHandle.WaitAny` to determine when to launch the next process. No need for semaphores, worker threads, `BeginInvoke`, `BlockingCollection`, or anything fancy. Microsoft even says it should be possible to get a `WaitHandle` for a process, see http://social.msdn.microsoft.com/profile/stephen%20fisher%20-%20msft/?type=forum – Ben Voigt Dec 06 '10 at 00:51
  • Sorry, wrong link, should have been http://social.msdn.microsoft.com/forums/en-US/clr/thread/d8bbaf7f-9827-4704-9817-3def8f3b3511/ – Ben Voigt Dec 06 '10 at 00:59

6 Answers6

3

Here's what should have been possible, if the wait handle associated with the process was marked public instead of internal as it currently is (vote here to ask Microsoft to change that):

void BatchProcess()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    ...
    queue.Enqueue("10000.mp3");

    WaitHandle[] subprocesses = new WaitHandle[Math.Min(queue.Count, 5)];
    for( int i = 0; i < subprocesses.Length; i++ ) {
        subprocesses[i] = Spawn(queue.Dequeue());
    }

    while (queue.Count > 0) {
        int j = WaitHandle.WaitAny(subprocesses);
        subprocesses[j].Dispose();
        subprocesses[j] = Spawn(queue.Dequeue());
    }

    WaitHandle.WaitAll(subprocesses);
    foreach (var wh in subprocesses) {
        wh.Dispose();
    }
}

ProcessWaitHandle Spawn(string args)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        return p.WaitHandle;
    }
}

This would be the most efficient solution possible, because no synchronization objects are needed besides the Win32 processes themselves. There are no extra threads needed in the C# code and no asynchronous method invocations, therefore no locking or other synchronization is needed whatsoever.

Ben Voigt
  • 277,958
  • 43
  • 419
  • 720
  • @adymitruk: Compared to launching 10000 child processes, no 5 threads won't matter performance-wise. But writing multithreaded code is much harder than single threaded, as evidenced by the fact that every answer here that uses multiple threads (i.e. all but my one, which isn't really an answer in any existing version of .NET, but an explanation of how the underlying Win32 API handles things) initially had some sort of threading error. You exhausted the thread pool, someone had a race condition, someone else didn't use atomic increment and decrement, someone busy-waited. – Ben Voigt Dec 06 '10 at 05:59
  • ... and the last one, which was almost right, leaked a semaphore object (which is an unmanaged resource, relying on the GC to clean it up is a bad idea). Writing (not to mention debugging and maintaining) multi-threaded code is hard to do right, so it's worth avoiding when you just have overlapped operations and not actual parallelism, which is true here, just for the simplicity of single-threaded logic. – Ben Voigt Dec 06 '10 at 06:02
  • "at all costs" might be a little too strong, but there is a mental cost to designing in a parallel (threaded) way so you should avoid it unless the benefits outweigh that cost. – Ben Voigt Dec 06 '10 at 21:19
1

Exctracting some parts of your code and adding a semaphore:

Semaphore semX = new Semaphore(5, int.MaxValue);

void f(name, args) {
    using (Process p = new Process())
    {
        p.StartInfo.FileName = name;
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }

    semX.Release();     // !!! This one is important
}

You then use

while (queue.Count > 0)
{
    string file = queue.Dequeue();
    semX.WaitOne();    // !!!
    (new Thread((ThreadStart) (() => f(file, "")))).Start();    // dirty unreadable code to start a routine async
}

for (int n = 5; n > 0; n--)        // Wait for the last 5 to finish
    semX.WaitOne();

semX.Dispose();                    // Dispose the semaphore
bohdan_trotsenko
  • 5,167
  • 3
  • 43
  • 70
1

This works (this will be easier with C# 5.0 async await):

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

int runningProcesses = 0;
const int MaxRunningProcesses = 5;
object syncLock = new object();

Action<string> run = new Action<string>(delegate(string file) {
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }
});

Action runNext = null;
runNext = delegate() {
    lock (syncLock) {
        if (queue.Count > 0) {
            run.BeginInvoke(queue.Dequeue(), new AsyncCallback(delegate {
                runNext();
            }), null);
        }
    }
};

while (runningProcesses++ < MaxRunningProcesses) {
    runNext();
}
Alex LE
  • 20,042
  • 4
  • 30
  • 28
  • Queue is not thread safe. You need to synchronize access to it. – Adam Dymitruk Dec 05 '10 at 21:52
  • In this case you are correct about the need for synchronization, because the queue is accessed from the completion routine. – Ben Voigt Dec 05 '10 at 22:14
  • @Alex: No good, it's still possible for the queue to empty in between `if (queue.Count > 0)` and `queue.Dequeue()`. – Ben Voigt Dec 06 '10 at 00:04
  • 1
    @Alex: My previous comment still applies, you need to move the `if (queue.Count > 0)` test inside `run` and not release the lock in between. Also, you don't need all the `runningProcesses` counting, if you just start exactly the right number of workers in the first place. It's also buggy, you have a race condition that allows starting an infinite number of workers, and because `runningProcesses` is updated non-atomically, it isn't even going to be accurate. – Ben Voigt Dec 06 '10 at 00:27
  • I think you've fixed it. Of course, there's still the small issue that Process is derived from IDisposable and you aren't even calling Dispose properly (a using block will take care of that). Where'd your declaration of `syncLock` go? I'm pretty sure it was there before. – Ben Voigt Dec 06 '10 at 01:27
  • I deleted the vars declarations by mistake also added the using block, I think this is my answer with more edits so far, thank you for the feedback Ben. – Alex LE Dec 06 '10 at 01:35
  • @Alex: Glad to be able to help. Your approach has the feature that the main thread can go on to other tasks immediately, without waiting for any of the child processes to exit. Hard to say whether that behavior is good or bad, it depends on the situation and the question doesn't really give that sort of information. – Ben Voigt Dec 06 '10 at 01:44
  • 1
    @Alex: 'Hopefully thread safe'? If the reasoning you used to arrive at your code does not make you *sure* that it's thread safe, then it's probably not. Programming by coincidence or trial and error is not usually considered good practice, and has a habit of coming back to bite you, especially when writing multithreaded code. – Joren Dec 06 '10 at 20:20
0

You can use semaphores for this and asynchronously call the long running process as much as you want:

private Semaphore _semaphore;
private delegate void Processor(string fileName);
[Test]
public void SetterTest() {
  var queue = new Queue<string>();
  queue.Enqueue("1.mp3");
  queue.Enqueue("2.mp3");
  queue.Enqueue("3.mp3");
  // ..
  queue.Enqueue("10000.mp3");
  var noOfThreads = 5;
  using (_semaphore = new Semaphore(noOfThreads, noOfThreads)) {
    while (queue.Count > 0) {
      string fileName;
      fileName = queue.Dequeue();
      _semaphore.WaitOne();
      new Processor(ProcessFile).BeginInvoke(fileName, null, null);
    }
    for (int i=0; i<noOfThreads; i++) _semaphore.WaitOne();
  }
}
private void ProcessFile(string file) {
  Process p;
  using (p = new Process()) {
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
  }
  _semaphore.Release();
}

hope this helps

Adam Dymitruk
  • 124,556
  • 26
  • 146
  • 141
  • This is not good, every thread in your system thread pool will be waiting for the semaphore, and there will be no threads free for other tasks. Better to acquire the semaphore BEFORE calling `BeginInvoke`. – Ben Voigt Dec 05 '10 at 21:55
  • Also, locking the queue is pointless, as it is local to just one thread (and wouldn't provide thread-safety anyway, since the while condition is outside the critical section. – Ben Voigt Dec 05 '10 at 22:12
  • Well it doesn't have to be a queue. I just needed something to hold my filenames. – Makabra Dec 05 '10 at 22:14
  • Using a queue is just fine. It's the `lock (queue)` that has no value. – Ben Voigt Dec 05 '10 at 22:16
  • Why are you using a non-thread-safe queue and a semaphore when the collections in System.Collections.Concurrent namespace are there to solve exactly those kind of problems? – m0sa Dec 05 '10 at 22:16
  • @m0sa: A semaphore is perfect for this problem, and `BeginInvoke` provides a thread-safe queue. And it all works under .NET 2.0. – Ben Voigt Dec 05 '10 at 22:35
  • Yes, but this method returns while the last 5 files are potentially still being processed, which might not be the desired behavior, considering the given example. – m0sa Dec 05 '10 at 22:47
  • @m0sa: That's easy enough to fix, by calling `WaitOne` five times after the queue empties. A more serious problem is not disposing the semaphore properly. – Ben Voigt Dec 06 '10 at 00:05
  • The Process object also needs to be disposed. – Ben Voigt Dec 06 '10 at 01:23
  • put using statements around the 2. – Adam Dymitruk Dec 06 '10 at 05:28
  • Now the code waits for all the files to be processed. Thanks for the insight, Ben. – Adam Dymitruk Dec 06 '10 at 19:20
0

Basically you have a producer consumer problem. So you should absolutely use the collections in the System.Collections.Concurrent namespace. Here is a simple example that you can simply apply to your problem - as a added bonus you can start filling the queue and its processing at the same time!

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

class Program
{
    static readonly BlockingCollection<string> _collection = new BlockingCollection<string>();

    static void Main()
    {
        const int maxTasks = 5;
        var tasks = new List<Task> {
            // startup publisher task...
            Task.Factory.StartNew(() => { 
                for(var i = 0; i < 1000; i++)
                {
                    _collection.Add(i + ".mp3");
                }
                Console.WriteLine("Publisher finished");
                _collection.CompleteAdding();
            }),
        };
        for (var i = 0; i < maxTasks; i++)
        {
            tasks.Add(Task.Factory.StartNew(ConsumerTask(i)));
        }
        Task.WaitAll(tasks.ToArray()); // wait for completion
    }

    static Action ConsumerTask(int id)
    {
        // return a closure just so the id can get passed
        return () =>
        {
            string item;
            while (true)
            {
                if (_collection.TryTake(out item, -1))
                {
                    using(Process p = new Process())
                    {
                        p.StartInfo.FileName = "binary.exe";
                        p.StartInfo.Arguments = item;
                        p.Start();
                        p.WaitForExit();
                        var exitCode = p.ExitCode;
                        // TODO handle exit code
                    }
                }
                else if (_collection.IsAddingCompleted)
                {
                    break; // exit loop
                }
            }
            Console.WriteLine("Consumer {0} finished", id);
        };
    }
}
m0sa
  • 10,712
  • 4
  • 44
  • 91
  • Artificial threadsleep and hard coded calls for the number of concurrent threads you need is not the best idea. – Adam Dymitruk Dec 05 '10 at 21:58
  • Come on, it's an example! How hard is it to make a for loop to spawn as many consumers as you want? – m0sa Dec 05 '10 at 22:00
  • @adymitruk I reshaped the example, I hope you remove the downvote – m0sa Dec 05 '10 at 22:08
  • @m0sa: It appears that adymitruk is downvoting all competing solutions, including the correct ones, in order to bring his to the top. And I'm out of votes, so I can't compensate until the new GMT day starts. – Ben Voigt Dec 05 '10 at 22:36
  • You should just check the `IsAddingCompleted` property instead of using a volatile flag, and while sleeping for 100 milliseconds is definitely better than busy-waiting, since you're just going to resume waiting immediately, why not block indefinitely? In either case, when the producer calls `CompleteAdding()` it will interrupt the wait. – Ben Voigt Dec 06 '10 at 00:17
  • Process is IDisposable, therefore you probably should call Dispose on it. – Ben Voigt Dec 06 '10 at 01:24
0

This is one will block the Main thread partially base on Ben's answer, but this already run.

static void Run(string file)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        p.WaitForExit();
    }
}

static WaitHandle RunAsync(string file)
{
    Action<string> result = new Action<string>(Run).BeginInvoke(file, null, null);
    return result.AsyncWaitHandle;
}

static void Main()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    queue.Enqueue("4.mp3");
    queue.Enqueue("5.mp3");
    queue.Enqueue("6.mp3");
    // ...
    queue.Enqueue("10000.mp3");


    const int MaxRunningProcesses = 5;

    List<WaitHandle> runningProcesses = new List<WaitHandle>(MaxRunningProcesses);

    while (queue.Count > 0 && runningProcesses.Count < MaxRunningProcesses) {
        runningProcesses.Add(RunAsync(queue.Dequeue()));
    }

    while (runningProcesses.Count > 0) {
        int j = WaitHandle.WaitAny(runningProcesses.ToArray());
        runningProcesses[j].Close();
        runningProcesses.RemoveAt(j);
        if (queue.Count > 0) {
            runningProcesses.Add(RunAsync(queue.Dequeue()));
        }
    }
}
Alex LE
  • 20,042
  • 4
  • 30
  • 28