4

I want to process some files with maximum throughput. The paths to files are saved in a database. I need to get file paths from the database, change their status to processing, process them, then change their status to either completed or failed.

Currently, I get the files in batches (of 100 files) in order to decrease the number of queries done and process them in parallel (with a degree of parallelism of 10). But in this way, I am losing throughput towards the end of the batch. When there are less than 10 files remaining in the batch the degree of parallelism is not 10 anymore, it decreases.

Here is what I have:

private async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
{
    var batchIndex = 0;
    while (true)
    {
        var fileBatch = _sourceFileService.GetSourceFileBatchBySourcePathId(
            sourcePath.Id, _dataSourceExportConfig.FileCopyBatchSize, Status.Pending);
        if (fileBatch.Count == 0)
            return;

        await SetInProgressStatusForBatch(fileBatch)
            .ConfigureAwait(false);

        fileBatch
            .AsParallel()
            .WithDegreeOfParallelism(_dataSourceExportConfig.FileCopyDegreeOfParallelism)
            .ForAll(file => ProcessFile(file, destinationBase, options));

        await _sourceFileService
            .UpdateSourceFilesStatusAsync(fileBatch)
            .ConfigureAwait(false);

        batchIndex++;
    }
}

private async Task SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
    foreach (var file in fileBatch)
        file.Status = Status.InProgress;

    await _sourceFileService
        .UpdateSourceFilesStatusAsync(fileBatch)
        .ConfigureAwait(false);
}

private void ProcessFile(
    SourceFile file,
    string destinationBase,
    Options options)
{
    try
    {
        //do something ...

        file.Status = Status.Success;
        file.ExceptionMessage = null;
    }
    catch (Exception ex)
    {
        _logger.Error(ex);
        file.Status = Status.Failed;
        file.ExceptionMessage = ex.Message;
    }
}

How can I maximize the throughput? I read about the producer-consumer pattern with BlockingCollection, TPL Dataflow, and Rx and I am pretty sure that what I want to achieve can be implemented with any of the above, but I wasn't able to do it so far. With the producer-consumer pattern, my producer is extremely fast compared to the consumer, with TPL Dataflow I got stuck with the BatchBlock and I haven't tried Rx. Could someone please point me in the right direction?

Update: Here is a minimal, complete and verifiable example:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    internal static class Program
    {
        private static void Main()
        {
            Console.WriteLine("Processing files");

            var stopWatch = new Stopwatch();
            stopWatch.Start();

            var fileService = new FileService();
            fileService.ProcessPendingFiles();

            foreach (var sourceFile in fileService.SourceFiles)
            {
                Console.WriteLine($"{sourceFile.Id} {sourceFile.Status}");
            }

            Console.WriteLine(stopWatch.Elapsed);

            Console.ReadLine();
        }
    }

    public class FileService
    {
        private const int BatchSize = 100;
        private const int DegreeOfParallelism = 10;
        //this SourceFiles property replaces the Sqlite database where the data is actually stored
        public ICollection<SourceFile> SourceFiles =
            Enumerable
                .Range(0, 1000)
                .Select(i =>
                    new SourceFile
                    {
                        Id = i,
                        Path = "source file path",
                        Status = Status.Pending,
                    })
                .ToList();

        public void ProcessPendingFiles()
        {
            while (true)
            {
                var fileBatch = GetSourceFileBatch(BatchSize, Status.Pending);
                if (fileBatch.Count == 0)
                    return;

                SetInProgressStatusForBatch(fileBatch);

                fileBatch
                    .AsParallel()
                    .WithDegreeOfParallelism(DegreeOfParallelism)
                    .ForAll(ProcessFile);

                UpdateSourceFiles(fileBatch);
            }
        }

        private ICollection<SourceFile> GetSourceFileBatch(int batchSize, Status status)
            => SourceFiles
                .Where(sf => sf.Status == status)
                .Take(batchSize)
                .ToList();

        //set status to in progress for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                file.Status = Status.InProgress;

                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
            }
        }

        //set status and exception messages for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void UpdateSourceFiles(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
                sourceFile.ExceptionMessage = file.ExceptionMessage;
            }
        }

        private void ProcessFile(SourceFile file)
        {
            try
            {
                //do something ...
                Thread.Sleep(20);

                file.Status = Status.Success;
                file.ExceptionMessage = null;
            }
            catch (Exception ex)
            {
                file.Status = Status.Failed;
                file.ExceptionMessage = ex.Message;
            }
        }
    }

    public class SourceFile
    {
        public int Id { get; set; }

        public string Path { get; set; }

        public Status Status { get; set; }

        public string ExceptionMessage { get; set; }
    }

    public enum Status
    {
        Pending,

        InProgress,

        Success,

        Failed,
    }
}
Marius Stănescu
  • 3,603
  • 2
  • 35
  • 49
  • 1) If you had a source of filenames, then you could have *n* workers which each takes a filename from the source, processes it, and repeats until there are no more filenames. 2) Depending on what processing a file entails, you might get more significant improvements by tuning the size of any disk I/O buffers that are used. 3) Make sure it doesn't run fastest with just one file being processed at a time. – Andrew Morton Oct 30 '18 at 11:26
  • just write some sudo "words" as to what you are doing, like a flow.. step1.. step 2... its hard to tell if you are interacting with the DB inside the ProcessFile, therefore hard to tell where the bottleneck is, like hard to tell the entry point – Seabizkit Oct 30 '18 at 11:33
  • @Seabizkit I am not interacting with the DB inside ProcessFile. – Marius Stănescu Oct 30 '18 at 11:42
  • Is it possible for you to provide a [mcve]? – Enigmativity Oct 31 '18 at 10:55
  • @Enigmativity, sure, check the updated question. – Marius Stănescu Oct 31 '18 at 12:01
  • @MariusStănescu - It doesn't seem to be minimal. – Enigmativity Oct 31 '18 at 12:41
  • @MariusStănescu - Rx seems to do it quite well, but I suspect I'm missing something with your question. After a small bit of hacking I did this: `fileService.SourceFiles.ToObservable().SelectMany(x => Observable.Start(() => fileService.ProcessFile(x))).ToArray().Wait();`. It finished in 65% of the time that your code did. – Enigmativity Oct 31 '18 at 12:46
  • @Enigmativity, how would I incorporate the batching logic? I use batching in order to have 2-3 queries per batch and not stress the database with 2-3 queries per file. – Marius Stănescu Oct 31 '18 at 13:21
  • @Enigmativity, also, how would I control the degree of parallelism? – Marius Stănescu Oct 31 '18 at 13:24
  • 1
    @MariusStănescu - You can change the `SelectMany` to `Select` + `Merge` to be able to specify the degree of parallelism. – Enigmativity Nov 01 '18 at 01:27
  • @MariusStănescu - Rx is usually pretty good at scheduling to avoid stress. I'd just try the basic query and then determine if you have a problem then before trying to do any form of batching. – Enigmativity Nov 01 '18 at 01:28
  • So I should just update the status before processing one item, then update the status once it was processed? I am worried about that many queries from multiple threads on an SQLite database. There can be multiple threads running the ProcessPendingFiles, inside which there is also concurrency. I don't think SQLite will handle that very well. That's why I wanted to batch the queries. – Marius Stănescu Nov 01 '18 at 15:58

4 Answers4

2

I know you are probably going to hate this answer, but ultimately, it depends...

I'm not entirely sure what these files are, where they live or what processing them means. My answer assumes you are happy with the current processing at peak, you just need a better way of ensuring you get consistent performance here and it doesn't drop towards the tail of the operation. I'll try to stick to answering your more direct question in terms of using the producer-consumer pattern with a BlockingCollection rather than change the entire approach.

I do think you understand why the slowdown is happening, but you aren't sure how to deal with this since you are fetching the next batch of items only when the current batch completes. (Needless to say this is probably a good case for using a message queue rather than SQL but that's a somewhat separate discussion that avoids your primary question.)

This has been answered in quite a bit of detail on the following question:

classic producer consumer pattern using blockingcollection and tasks .net 4 TPL

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}

You could then have multiple consumers with a single producer (since you do point out that you are producing much faster than you are consuming)

Martin Venter
  • 231
  • 2
  • 12
2

This operation could of course be done with TPL-Dataflow as you mention but it's hard to know if you'd actually see any throughput gain. With any performance metric the best you can do is try different approaches and measure the results.

This sample includes the most relevant options to tune the behavior of the dataflow so you can experiment. The structure is based loosely on your sample code with some assumptions.

  • One SourcePath yields a batch of SourceFile
  • Updating SourceFile status is async
  • Processing SourceFile is sync

Sample:

public class ProcessFilesFlow
{
    private TransformBlock<SourcePath, IEnumerable<SourceFile>> _getSourceFileBatch;
    private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _setStatusToProcessing;
    private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _processFiles;
    private ActionBlock<IEnumerable<SourceFile>> _setStatusToComplete;

    public ProcessFilesFlow()
    {
        //Setup options
        //All of these options and more can be tuned for throughput
        var getSourceFileBatchOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //How many source paths to queue at one time
            MaxDegreeOfParallelism = 10, //How many source paths to get batches for at one time
            EnsureOrdered = false //Process batches as soon as ready
        };
        var setStatusToProcessingOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //How many batches to queue at one time
            MaxDegreeOfParallelism = 10, //Unlimited, how many batches to updates status for
            EnsureOrdered = false //Process batches as soon as ready
        };
        var processFilesOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //Batches to queue at one time
            MaxDegreeOfParallelism = 10, //Batches to work on at the same time
            EnsureOrdered = false //Process batches as soon as ready
        };
        var setStatusToCompleteOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //Batches to queue at one time
            MaxDegreeOfParallelism = 10, //Batches to update at once
            EnsureOrdered = false //Process batches as soon as ready
        };

        //Build the dataflow pipeline
        _getSourceFileBatch = new TransformBlock<SourcePath, IEnumerable<SourceFile>>(path => GetSourceFileBatch(path), getSourceFileBatchOptions);
        _setStatusToProcessing = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => SetStatusToProcessingAsync(batch), setStatusToProcessingOptions);
        _processFiles = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => ProcessFiles(batch), processFilesOptions);
        _setStatusToComplete = new ActionBlock<IEnumerable<SourceFile>>(batch => SetStatusToCompleteAsync(batch), setStatusToCompleteOptions);

        //Link the pipeline
        _getSourceFileBatch.LinkTo(_setStatusToProcessing, new DataflowLinkOptions() { PropagateCompletion = true });
        _setStatusToProcessing.LinkTo(_processFiles, new DataflowLinkOptions() { PropagateCompletion = true });
        _processFiles.LinkTo(_setStatusToComplete, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    public async Task ProcessAll(IEnumerable<SourcePath> sourcePaths)
    {
        foreach(var path in sourcePaths)
        {
            await _getSourceFileBatch.SendAsync(path);
        }
        _getSourceFileBatch.Complete();
        await _setStatusToComplete.Completion;
    }

    private IEnumerable<SourceFile> GetSourceFileBatch(SourcePath sourcePath)
    {
        //Get batch of files based on sourcePath
        return Enumerable.Empty<SourceFile>();
    }

    private async Task<IEnumerable<SourceFile>> SetStatusToProcessingAsync(IEnumerable<SourceFile> sourceFiles)
    {
        //Update file status
        foreach (var file in sourceFiles)
            await file.UpdateStatusAsync("In Progress");
        return sourceFiles;
    }

    private IEnumerable<SourceFile> ProcessFiles(IEnumerable<SourceFile> sourceFiles)
    {
        //process files
        foreach (var file in sourceFiles)
            file.Process();
        return sourceFiles;
    }

    private async Task SetStatusToCompleteAsync(IEnumerable<SourceFile> sourceFiles)
    {
        //Update file status
        foreach (var file in sourceFiles)
            await file.UpdateStatusAsync("Completed");
    }
}

Other options are available too, such as splitting a batch with a TransformManyBlock and processing individual files from batches in parallel.

JSteward
  • 6,833
  • 2
  • 21
  • 30
  • What are those blocks supposed to do? In any case, instead of processing batches of files with a TransformBlock that processes/emits an entire IEnumerable at a time, use a `TransformManyBlock` which will emit each item to the next block, allowing multiple blocks to work in parallel. Otherwise, this code is no better than calling each function one at a time – Panagiotis Kanavos Nov 06 '18 at 08:33
1

This is a Disk Operation. Paralelization does not work well on those. Disks have a physically limited throughput. And bombarding it with requests will only result in adding seek times to the whole calculation. There exists features like NCQ that will try to mitigate this effect, but those have limits.

With a Network at least paralilization can have some effect:

  • using the Medium while one request is in the "Protocoll overhead" phase
  • getting around "per connection" limits that might be in place

But even there, there are hard limits.

The best way to have fast disk operations is to not have a terrible backend disk. i.e., not using a rotating Disk. Or at least organizing them in a Raid 0 or similar structure.

Christopher
  • 9,634
  • 2
  • 17
  • 31
  • Thank you for the answer. It doesn't answer my question though. Imagine that I am not processing files, but some random data entity. (In fact, the files are on different network shares) – Marius Stănescu Oct 30 '18 at 11:34
  • @MariusStănescu: Network shares are unlikely to have Rate Throthling. So the limit is either the Network Transmission rate or the Disk I/O, wichever is lower. In either case this is hardly a programming problem. Loading the data will be the core limiter in 8/10 cases. – Christopher Oct 30 '18 at 12:48
  • There could be network limits per connection. For example, even though the network might be 1Gbps, one connection could be limited to 1Mbps. – Marius Stănescu Oct 31 '18 at 13:17
  • @MariusStănescu: Per connection limtis on a Windows share? Not sure if shares even have that feature. Or even have more then 1 connection/user for that mater. Of course it is not my area of expertise. – Christopher Oct 31 '18 at 19:34
  • We are digressing. I am interested in a way to improve the throughput of the process just to learn how to do it better. As I said earlier, you can imagine the thing that I am processing is a random entity that does very CPU intensive work and is not related to I/O at all. So, the question is not about I/O vs CPU. But I appreciate that you noticed that, I thank you very much for the input. Maybe I'll ask a different question about the topic and then we can have that discussion there. :) – Marius Stănescu Nov 01 '18 at 15:52
  • @Christopher parallelization *does* work with files because file IO is buffered at the application, OS, disk level. File metadata operations, opening/closing files aren't that IO bound either. IO operations are asynchronous by nature too. If the file is small, the file management overhead is big enough that parallelization offers significant benefits. With large files the number of cores and disk throughput will be the limiting factor – Panagiotis Kanavos Nov 06 '18 at 08:30
  • @PanagiotisKanavos: "This is a Disk Operation. Paralelization does not work well on those." != "Does not work at all". I intentionally choose my words so your missinterpreation was unlikely. I can not do more to avoid it. – Christopher Nov 06 '18 at 19:49
1

A worker pattern should simplify things for you, and ensure you are always processing a consistent number of units of work in parallel.

If you create for example 10 tasks up front, and allow them to take a new job until there are none left, you no longer rely on waiting on a whole batch of threads or tasks to all complete before starting any more.

class WorkController
{
    private DataSourceExportConfig _dataSourceExportConfig;
    private SourceFileService _sourceFileService;
    private string destinationBase;

    public async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
    {
        await Task.WhenAll(Enumerable.Range(0, 10).Select(x => Worker(sourcePath, options)));
    }

    public async Task Worker(SourcePath sourcePath, Options options)
    {
        SourceFile file = null;

        while (_sourceFileService.GetNextFile(out file))
        {
            ProcessFile(file, destinationBase, options);
        }
    }

    private void ProcessFile(SourceFile file, string destinationBase, Options options)
    {
    }
}
Creyke
  • 1,887
  • 2
  • 12
  • 16