0

I have a few thousand files and I use a function to extract some information, then I enter the information into a database. Reading the file from the disk is fast. Extracting the data is slow. Updating the database is fast. I want to perform this operation on multiple threads, to make better use of the CPU. I set up a semaphore, but I don't see the expected behavior. I expect to see the program start processing three files, then completing one and only then starting another one. In the beginning I see much more than three files that started processing at the same time and none of them are completed yet.

using System;
using System.Threading;
using System.IO;
using System.Collections.Generic;

namespace Threads
{
    class Program
    {
        static Semaphore semaphore = new Semaphore(3, 3);
        static Queue<string> queue = new Queue<string>();
        
        public static void Main(string[] args)
        {
            string[] files = Directory.GetFiles(@"C:\MyFolder");
            foreach (string file in files) {
                queue.Enqueue(file);
            }
            
            while (queue.Count > 0) {
                string fileName1 = NextFile();
                semaphore.WaitOne();
                Thread thread1 = new Thread(() => ProcessFile(fileName1));
                thread1.Start();            
                semaphore.Release();
            }
            
            Console.Write("Press any key to continue . . . ");
            Console.ReadKey(true);
        }
        
        public static void ProcessFile(string fileName)
        {
            Console.WriteLine("Processing file " + fileName);
            string value = ExtractData(fileName);
            InsertInDatabase(value);
            Console.WriteLine("Completed processing file " + fileName);
        }
        
        public static string NextFile()
        {
            string fileName = queue.Dequeue();
            return fileName;
        }
        
        /// <summary>
        /// This function takes a long time
        /// </summary>
        /// <param name="fileName"></param>
        /// <returns></returns>
        static string ExtractData(string fileName)
        {
            Thread.Sleep(5000);
            return "value";
        }
        
        static void InsertInDatabase(string value)
        {
            Thread.Sleep(100);
            // do some work         
        }
    }
}
Nick_F
  • 1,103
  • 2
  • 13
  • 30
  • As a side note, you have a good use case for the [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library. You can see an example [here](https://stackoverflow.com/questions/62602684/c-sharp-process-files-concurrently-and-asynchronously/62613098#62613098). – Theodor Zoulias May 15 '21 at 05:07
  • "Reading the file from the disk is fast. Extracting the data is slow. Updating the database is fast." - That sounds the opposite of what I'd generally expect. Can you provide any benchmark timings of these operations to confirm? – Enigmativity May 15 '21 at 06:11
  • 1
    Do consider using Rx to avoid all of the semaphore stuff. Your code would look like this: `var processed = (from file in Directory.GetFiles(@"C:\MyFolder").ToObservable() from data in Observable.Start(() => ExtractData(file)) from insert in Observable.Start(() => InsertInDatabase(data)) select file).ToArray().Wait();`. – Enigmativity May 15 '21 at 06:36
  • Enigmativity - reading the file (typically around 1MB takes about 400 ms, extracting the data takes about 1 minute (used to be about 8 seconds), saving the data takes another few hundred milliseconds. I have to see why is taking so long to extract the data... I also get Out Of Memory error after a few minutes. – Nick_F May 15 '21 at 06:41
  • @Nick_F - It'd be great to see your actual code. Using threads isn't going to solve that problem - it's going to make it worse. – Enigmativity May 15 '21 at 06:45
  • Enigmativity - Thanks for the code presented, I will see if I can use it – Nick_F May 15 '21 at 06:45
  • 1
    @Nick_F - NuGet "System.Reactive" to get the bits and then you need `using System.Reqctive.Linq;`. – Enigmativity May 15 '21 at 06:49
  • @Nick_F - Ideally I'd like to see your extract code to fix the underlying issues you have. – Enigmativity May 15 '21 at 06:50
  • @Enigmativity - I am just working on the code, I didn't know how slow it is until you asked :) I would like to get some help for performance improvement of extracting the data, but that would be a separate question. I am using HtmlAgilityPack and Regex. – Nick_F May 15 '21 at 06:51
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/232434/discussion-between-nick-f-and-enigmativity). – Nick_F May 15 '21 at 06:56

1 Answers1

2

When using a semaphore the thread that will actually performing the work should wait for, and then release the sempahore.

Here the main thread is waiting for the semaphore and then releasing it immediately after starting a worker thread.

while (queue.Count > 0) {
    string fileName1 = NextFile();
    semaphore.WaitOne();
    Thread thread1 = new Thread(() => ProcessFile(fileName1));
    thread1.Start();            
    semaphore.Release();
}

You should move the sempahore.WaitOne() to the ProcessFile method since that's going to the the thread that actually uses resource.

public static void ProcessFile(string fileName)
{
    semaphore.WaitOne();
    try
    {
        Console.WriteLine("Processing file " + fileName);
        string value = ExtractData(fileName);
        InsertInDatabase(value);
        Console.WriteLine("Completed processing file " + fileName);
    }
    finally
    {
        // make sure the sempahore releases even if we encounter an error
        semaphore.Release();
    }
}
DekuDesu
  • 2,224
  • 1
  • 5
  • 19