0

This question is a follow up on How to optimize performance in a simple TPL DataFlow pipeline?

The source code is here - https://github.com/MarkKharitonov/LearningTPLDataFlow

Given:

  • Several solutions covering about 400 C# projects encompassing thousands of C# source files totaling in more than 10,000,000 lines of code.
  • A file containing string literals, one per line.

I want to produce a JSON file listing all the occurrences of the literals in the source code. For every matching line I want to have the following pieces of information:

  • The project path
  • The C# file path
  • The matching line itself
  • The matching line number

And all the records arranged as a dictionary keyed by the respective literal.

So the challenge is to do it as efficiently as possible (in C#, of course).

The DataFlow pipeline can be found in this file - https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs

Here it is:

private void Run(string workspaceRoot, string outFilePath, string[] literals, bool searchAllFiles, int workSize, int maxDOP1, int maxDOP2, int maxDOP3, int maxDOP4)
{
    var res = new SortedDictionary<string, List<MatchingLine>>();
    var projects = (workspaceRoot + "build\\projects.yml").YieldAllProjects();
    var progress = new Progress();

    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Environment.ProcessorCount);

    var produceCSFiles = new TransformManyBlock<ProjectEx, CSFile>(p => YieldCSFiles(p, searchAllFiles), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP1
    });
    var produceCSFileContent = new TransformBlock<CSFile, CSFile>(CSFile.PopulateContentAsync, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP2
    });
    var produceWorkItems = new TransformManyBlock<CSFile, (CSFile CSFile, int Pos, int Length)>(csFile => csFile.YieldWorkItems(literals, workSize, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP3,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var produceMatchingLines = new TransformManyBlock<(CSFile CSFile, int Pos, int Length), MatchingLine>(o => o.CSFile.YieldMatchingLines(literals, o.Pos, o.Length, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP4,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var getMatchingLines = new ActionBlock<MatchingLine>(o => AddResult(res, o));

    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    produceCSFiles.LinkTo(produceCSFileContent, linkOptions);
    produceCSFileContent.LinkTo(produceWorkItems, linkOptions);
    produceWorkItems.LinkTo(produceMatchingLines, linkOptions);
    produceMatchingLines.LinkTo(getMatchingLines, linkOptions);

    var progressTask = Task.Factory.StartNew(() =>
    {
        var delay = literals.Length < 10 ? 1000 : 10000;
        for (; ; )
        {
            var current = Interlocked.Read(ref progress.Current);
            var total = Interlocked.Read(ref progress.Total);
            Console.Write("Total = {0:n0}, Current = {1:n0}, Percents = {2:P}   \r", total, current, ((double)current) / total);
            if (progress.Done)
            {
                break;
            }
            Thread.Sleep(delay);
        }
        Console.WriteLine();
    }, TaskCreationOptions.LongRunning);

    projects.ForEach(p => produceCSFiles.Post(p));
    produceCSFiles.Complete();
    getMatchingLines.Completion.GetAwaiter().GetResult();
    progress.Done = true;
    progressTask.GetAwaiter().GetResult();

    res.SaveAsJson(outFilePath);
}

The default parameters are (https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs#L24-L28):

private int m_maxDOP1 = 3;
private int m_maxDOP2 = 20;
private int m_maxDOP3 = Environment.ProcessorCount;
private int m_maxDOP4 = Environment.ProcessorCount;
private int m_workSize = 1_000_000;

My idea is to divide the work into work items, where a work item size is computed by multiplying the number of lines in the respective file by the count of the string literals. So, if a C# file contains 500 lines, then searching it for all the 3401 literals results in a work of size 3401 * 500 = 1700500

The unit of work is by default 1000000 lines, so in the aforementioned example the file would result in 2 work items:

  1. Literals 0..1999
  2. Literals 2000..3400

And it is the responsibility of the produceWorkItems block to generate these work items from files.

Example runs:

C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\2.txt
Locating all the instances of the 4 literals found in the file C:\temp\2.txt in the C# code ...
Total = 49,844,516, Current = 49,702,532, Percents = 99.72%
Elapsed: 00:00:18.4320676
C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\1.txt
Locating all the instances of the 3401 literals found in the file c:\temp\1.txt in the C# code ...
Total = 42,379,095,775, Current = 42,164,259,870, Percents = 99.49%
Elapsed: 01:44:13.4289270

Question

Many work items are undersized. If I have 3 C# files, 20 lines each, my current code would produce 3 work items, because in my current implementation work items never cross a file boundary. This is inefficient. Ideally, they would be batched into a single work item, because 60 * 3401 = 204060 < 1000000. But the BatchBlock cannot be used here, because it expects me to provide the batch size, which I do not know - it depends on the work items in the pipeline.

How would you achieve such batching ?

mark
  • 59,016
  • 79
  • 296
  • 580
  • Is there any problem with this question to merit a close vote? – mark Oct 06 '21 at 04:05
  • To put it bluntly maybe two separate questions in the same question is one question too many. – karel Oct 06 '21 at 04:20
  • I realize it. On the other hand, I do not want to repeat the whole body of the question twice. I do not understand this closing methodology. Why not let the folks out there decide? If this is a bad question, it will not get enough traction and that would force me to reconsider. Let the free market do its thing. – mark Oct 06 '21 at 05:11

2 Answers2

1

I have realized something. Maybe it is obvious, but I have just figured it out. The TPL DataFlow library is of no use if one can buffer all the items first. So in my case - I can do that. And so, I can buffer and sort the items from large to small. This way a simple Parallel.ForEach will do the work just fine. Having realized that I changed my implementation to use Reactive like this:

Phase 1 - get all the items, this is where all the IO is

var input = (workspaceRoot + "build\\projects.yml")
    .YieldAllProjects()
    .ToObservable()
    .Select(project => Observable.FromAsync(() => Task.Run(() => YieldFiles(project, searchAllFiles))))
    .Merge(2)
    .SelectMany(files => files)
    .Select(file => Observable.FromAsync(file.PopulateContentAsync))
    .Merge(10)
    .ToList()
    .GetAwaiter().GetResult()
    .AsList();

input.Sort((x, y) => y.EstimatedLineCount - x.EstimatedLineCount);

Phase 2 - find all the matching lines (CPU only)

var res = new SortedDictionary<string, List<MatchingLine>>();
input
    .ToObservable()
    .Select(file => Observable.FromAsync(() => Task.Run(() => file.YieldMatchingLines(literals, 0, literals.Count, progress).ToList())))
    .Merge(maxDOP.Value)
    .ToList()
    .GetAwaiter().GetResult()
    .SelectMany(m => m)
    .ForEach(m => AddResult(res, m));

So, even though I have hundreds of projects, thousands of files and millions lines of code - it is not the scale for TPL DataFlow, because my tool can read all the files into memory, rearrange in a favorable order and then process.

mark
  • 59,016
  • 79
  • 296
  • 580
  • Using the Rx library as a pipeline engine can work well in some simple scenarios, but there are limitations: 1. No backpressure. As you pointed out, you must accept that all the input, intermediate and output data could be stored in-memory during the processing. In many scenarios this is undesirable or unrealistic. 2. No propagation of all errors. If multiple actions fail concurrently, you'll get a single error and that's it. 3. No graceful termination. If an action fails the sequence will terminate immediately, while other concurrent actions may continue running in a fire-and-forget fashion. – Theodor Zoulias Oct 08 '21 at 12:32
  • Also from my experience the Rx is currently not completely perfect. There are still bugs lurking in there, like [this](https://stackoverflow.com/questions/51714221/async-create-hanging-while-publishing-observable) for example. Combining Rx and async may result in you becoming an unwilling beta tester of the library. I am currently investigating [an issue](https://stackoverflow.com/a/64880836/11178549) where a relatively simple Rx pipeline hangs when a specific action fails, and why it stops hanging if I do something irrelevant like attaching a `.Finally(() => { })` after the `FromAsync`. – Theodor Zoulias Oct 08 '21 at 12:47
  • @TheodorZoulias - Hmm, these are all valid points. So you do not think TPL DataFlow is an overkill for my scenario where I can definitely read all the source files into memory. True, my process occupies about 1GB of ram, but for this tool I do not really care. All I need is performance. And I investigate it in 2 streams - efficient pipeline and efficient algorithm, the latter is a separate topic and I will probably ask separate questions on it. – mark Oct 08 '21 at 15:12
  • Mark the TPL Dataflow is currently the best tool offered by Microsoft for building processing pipelines IMHO. It's not easily extensible, and it's a bit dated, verbose and clumsy, but still not outdated. It offers flexibility and rich functionality to compensate for its shortcomings. Regarding performance, I would expect it to outperform consistently an Rx-based implementation, when configured to do the same work in the same way. If you have benchmarks that prove otherwise, I would be interested to know. :-) – Theodor Zoulias Oct 08 '21 at 16:04
0

Regarding the first question (configuring the pipeline), I can't really offer any guidance. Optimizing the parameters of a dataflow pipeline seems like a black art to me!

Regarding the second question (how to batch a work load consisting of work items having unknown size at compile time), you could use the custom BatchBlock<T> below. It uses the DataflowBlock.Encapsulate method in order to combine two dataflow blocks to one. The first block in an ActionBlock<T> that receives the input and puts it into a buffer, and the second is a BufferBlock<T[]> that holds the batched items and propagates them downstream. The weightSelector is a lambda that returns the weight of each received item. When the accumulated weight surpasses the batchWeight threshold, a batch is emitted.

public static IPropagatorBlock<T, T[]> CreateDynamicBatchBlock<T>(
    int batchWeight, Func<T, int> weightSelector,
    DataflowBlockOptions options = null)
{
    // Arguments validation omitted
    options ??= new DataflowBlockOptions();
    var outputBlock = new BufferBlock<T[]>(options);
    List<T> buffer = new List<T>();
    int sumWeight = 0;

    var inputBlock = new ActionBlock<T>(async item =>
    {
        checked
        {
            int weight = weightSelector(item);
            if (weight + sumWeight > batchWeight && buffer.Count > 0)
                await SendBatchAsync();
            buffer.Add(item);
            sumWeight += weight;
            if (sumWeight >= batchWeight) await SendBatchAsync();
        }
    }, new()
    {
        BoundedCapacity = options.BoundedCapacity,
        CancellationToken = options.CancellationToken,
        TaskScheduler = options.TaskScheduler,
        MaxMessagesPerTask = options.MaxMessagesPerTask,
        NameFormat = options.NameFormat
    });

    PropagateCompletion(inputBlock, outputBlock, async () =>
    {
        if (buffer.Count > 0) await SendBatchAsync();
    });

    Task SendBatchAsync()
    {
        var batch = buffer.ToArray();
        buffer.Clear();
        sumWeight = 0;
        return outputBlock.SendAsync(batch);
    }

    static async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<Task> postCompletionAction)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        Exception ex =
            source.Completion.IsFaulted ? source.Completion.Exception : null;
        try { await postCompletionAction(); }
        catch (Exception actionError) { ex = actionError; }
        if (ex != null) target.Fault(ex); else target.Complete();
    }

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);
}

Usage example:

var batchBlock = CreateDynamicBatchBlock<WorkItem>(1_000_000, wi => wi.Size);

If the weight int type has not enough range and overflows, you could switch to long or double.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104