2

I have millions of log files which generating every day and I need to read all of them and put together as a single file to do some process on it in other app.

I'm looking for the fastest way to do this. Currently I'm using Threads, Tasks and parallel like this:

Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
    ReadFiles(files[i]);
});

void ReadFiles(string file)
{
    try
    {
        var txt = File.ReadAllText(file);
        filesTxt.Add(tmp);
    }
    catch { }
    GlobalCls.ThreadNo--;
}

or

foreach (var file in files)
{
    //Int64 index = i;
    //var file = files[index];
    while (Process.GetCurrentProcess().Threads.Count > 100)
    { 
        Thread.Sleep(100);
        Application.DoEvents();
    }
    new Thread(() => ReadFiles(file)).Start();
    GlobalCls.ThreadNo++;
    // Task.Run(() => ReadFiles(file));      
}

The problem is that after a few thousand reading files, the reading gets slower and slower!!

Any idea why? and what's the fastest approaches to reading millions small files? Thank you.

AbdelAziz AbdelLatef
  • 3,650
  • 6
  • 24
  • 52
Sina
  • 575
  • 1
  • 5
  • 23
  • 1
    depending on what you want to do with them. i'd use some commandline tools, not c# to combine them. – Daniel A. White Sep 29 '19 at 01:41
  • 2
    You can't read all the files parallel. Even if your code could, you hard disk can't do it. As Daniel A. White mentioned; use a proper command line tool. – scopolamin Sep 29 '19 at 01:52
  • 1
    Threads are good for CPU-bound problems, but not for IO-bound problems. You end up in having many threads with all their overhead and most of them are waiting for IO to complete. Use async instead. See e.g. https://stackoverflow.com/questions/13167934/how-to-async-files-readalllines-and-await-for-results – Klaus Gütter Sep 29 '19 at 06:47
  • To start with, the task how you described it, isn't parallelized very well - while you can read files simultaneously the second part seems to be sequential as you have to synchronize the adding of content to a single file unless getting a randomly shuffled content is your goal. Secondly, you blended operations with different latency into single one where one can be a bottleneck for other (I/O throughput can be limited for example) and established parallelism to static high number which cannot be adjusted. And finally, the gradual performance degradation it typical indicator of GC pressure. – Dmytro Mukalov Sep 29 '19 at 13:07
  • 1
    How much memory does your program consume? – Lasse V. Karlsen Sep 30 '19 at 09:12

3 Answers3

3

It seems that you are loading the contents of all files in memory, before writing them back to the single file. This could explain why the process becomes slower over time.

A way to optimize the process is to separate the reading part from the writing part, and do them in parallel. This is called the producer-consumer pattern. It can be implemented with the Parallel class, or with threads, or with tasks, but I will demonstrate instead an implementation based on the powerful TPL Dataflow library, that is particularly suited for jobs like this.

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
    string targetFilePath, CancellationToken cancellationToken = default,
    IProgress<int> progress = null)
{
    var readerBlock = new TransformBlock<string, string>(async filePath =>
    {
        return File.ReadAllText(filePath); // Read the small file
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2, // Reading is parallelizable
        BoundedCapacity = 100, // No more than 100 file-paths buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    StreamWriter streamWriter = null;

    int filesProcessed = 0;
    var writerBlock = new ActionBlock<string>(text =>
    {
        streamWriter.Write(text); // Append to the target file
        filesProcessed++;
        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // We can't parallelize the writer
        BoundedCapacity = 100, // No more than 100 file-contents buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    readerBlock.LinkTo(writerBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    // This is a tricky part. We use BoundedCapacity, so we must propagate manually
    // a possible failure of the writer to the reader, otherwise a deadlock may occur.
    PropagateFailure(writerBlock, readerBlock);

    // Open the output stream
    using (streamWriter = new StreamWriter(targetFilePath))
    {
        // Feed the reader with the file paths
        foreach (var filePath in sourceFilePaths)
        {
            var accepted = await readerBlock.SendAsync(filePath,
                cancellationToken); // Cancel at any time
            if (!accepted) break; // This will happen if the reader fails
        }
        readerBlock.Complete();
        await writerBlock.Completion;
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex);
        }
    }
}

Usage example:

var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
    // Safe to update the UI
    Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
    SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);

The BoundedCapacity is used to keep the memory usage under control.

If the disk drive is SSD, you can try reading with a MaxDegreeOfParallelism larger than 2.

For best performance you could consider writing to a different disc drive than the drive containing the source files.

The TPL Dataflow library is available as a package for .NET Framework, and is build-in for .NET Core.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
2

When it comes to IO operations, CPU parallelism is useless. Your IO device (disk, network, whatever) is your bottleneck. By reading from the device concurrently you risk to even lower your performance.

Nick
  • 4,787
  • 2
  • 18
  • 24
0

Perhaps you can just use PowerShell to concatenate the files, such as in this answer.

Another alternative is to write a program that uses the FileSystemWatcher class to watch for new files and append them as they are created.

Ed Power
  • 8,310
  • 3
  • 36
  • 42