1

I am having a problem with a TPL Dataflow mesh in my C#/WPF app. The first input item, (called a "Job"), always goes all the way through the chain to the final TPL block. But the remaining jobs never arrive at the final block (#4), even though log statements clearly show them successfully being returned from block #3

Here is the mesh. Set up once and stored in a private member of my View-Model class.

// 1. _meshStartBlock:  On UI thread.   This block always works fine.

_meshStartBlock = new TransformBlock<Job, Job>(job =>
{
    Jobs.Add(job);
    Fire(_scanCapturedTrigger, job);  // Notify sstate machine.
    Log.Debug("Started: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{
    CancellationToken = TokenSource.Token,
    TaskScheduler = UiTaskScheduler   // Run on UI thread (because it edits
                                      // our ObservableCollection)
});

// 2. createBlock:  This block also always works fine.

var createBlock = new TransformBlock<Job, Job>(job =>
{
    job.CreateScan();          // Saves some disk files
    job.CreateThumbnail(true); // Creates and saves a thumbnail image.
    Log.Debug("Created: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 1 });


// 3. processBlock - do heavy work in parallel
// This block succeeds for all 3 jobs but 2nd and 3rd returned jobs never
// reach the next block.

var processBlock = new TransformBlock<Job, Job>(job =>
{
    try
    {
        Log.Debug("Processing: " + job.Name);
        job.AlignImages();            // heavy image processing
        job.Generate3d();             // heavy 3d math
        job.FindShapes();             // more heavy match
        job.GetContext().Scan.Save(); // save disk files
        Log.Debug("Processing succeeded: " + job.Name
    }
    catch (Exception e)
    {
        Log.Error("Processing failed: " + job.Name);
    }

    // *** THIS LOG STATEMENT SHOWS UP FOR ALL 3 JOBS ***

    Log.Debug("Leaving process block: " + job.Name);

    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 3 });


// 4. doneBlock: Cleans up.
// Since we schedule this on the UI thread it should not be heavy.

var doneBlock = new ActionBlock<Job>(job =>
{
    // *** ONLY REACHED BY JOB 1 ***  

    Log.Debug("Done: " + job.Name);
    Fire(Trigger.ScanProcessed);    // Notify State Machine
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token,  TaskScheduler = UiTaskScheduler });

// Set up the mesh.  Link the blocks together to form a chain.

_meshStartBlock.LinkTo(createBlock);
createBlock.LinkTo(processBlock);
processBlock.LinkTo(doneBlock);

return _meshStartBlock;

This is the log output I get

Started: Job1
Created: Job1
Started: Job2
Processing: Job1
Created: Job2
Processing: Job2
Started: Job3
Created: Job3
Processing: Job3
Processing succeeded: Job1
Leaving process block: Job1
Done: Job1
Processing succeeded: Job2
Leaving process block: Job2
Processing succeeded: Job3
Leaving process block: Job3

The Debug window does not report any exceptions during processing or dump error messages of any kind.

I should note that I am forced to run this in Release build. If I run a Debug build then that process block takes hours. Also the CancellationToken is never invoked

Can any TPL-Dataflow gurus tell me how I can diagnose what might be happening to Job2 and Job3? Is there anyway I can get TPL Dataflow to tell me what happened my Jobs?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Joe
  • 5,394
  • 3
  • 23
  • 54
  • 1
    Two points that are raising concerns: 1) `MaxDegreeOfParallelism = 3` <--- is everything running inside the `processBlock` independent from the parallely executing paths? 2) `// Notify state machine.` + `TaskScheduler = UiTaskScheduler` <--- have you ruled out the possibility of a deadlock caused by a blocked UI thread? – Theodor Zoulias May 18 '20 at 03:32
  • Hi. Thanks for the response. As far as I know, everything inside that block is independent. I invoke the same series of functions in another TPL mesh with no problems. Obviously that's not certainty. But if I set the `MaxDegreeOfParallelism` down to 1 the problem still happens. Regarding the bit about the UI Task scheduler. I don't think there is a deadlock and see no evidence of it if I break and look at threads in the debugger. But if there were a deadlocking problem, I would expect it to occur ***during*** a TPL block, not ***between*** them, yes? – Joe May 18 '20 at 14:25
  • 1
    The last block `doneBlock` uses the `UiTaskScheduler` too, so it is possible that it is blocked from running its action. You could get a better idea about what is happening by logging at strategic points the properties `InputCount` and `OutputCount` of the blocks. This would reveal where the propagation of the jobs (from the output buffer of the producer to the input buffer of the consumer) is obstructed. I would also try commenting temporarily the two `Fire` commands, to see if they have anything to do with the problem. – Theodor Zoulias May 18 '20 at 14:54
  • 1
    It turned out to be the final Fire() command. It invokes a third-party state machine library that throws an InvalidOperationException due to my poorly setting up the states. But I never saw it because a) I had no exception frame and b) I had "Just My Code" turned on. This must have locked up the TPL pipelne. I guess I thought that, an uncaught exception on a UI thread, would hit my UnhandledExceptionFilter but of course TPL just silently catches it and locks up the pipeline, even on UI threads. Lesson learned. Thanks. Learned something. – Joe May 19 '20 at 01:27

1 Answers1

1

It may help to attach error handlers to the blocks, to log the exceptions as soon as they happen. Here is an example of a simple generic error handler:

public static async void OnErrorLog(IDataflowBlock block)
{
    try
    {
        await block.Completion.ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        Log.Error($"{block.GetType().Name} failed", ex);
    }
}

You can adapt it to your likings.

Usage example:

OnErrorLog(_meshStartBlock);
OnErrorLog(createBlock);
OnErrorLog(processBlock);
OnErrorLog(doneBlock);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • I admit I'm a little confused by this. When/where exactly would I call OnErrorLog? As soon as I finish setting up the mesh? I'm not using async code there. – Joe May 19 '20 at 03:54
  • 1
    @Joe yes, after setting up the mesh. You don't need to `await` the calls. Actually it's not even possible to `await` them, since the handler is [async void](https://learn.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming#avoid-async-void). – Theodor Zoulias May 19 '20 at 04:21
  • 1
    Oh yes, of course! (smacks own head) And since any exception escaping a block like that is basically game-over for the block anyway, you get exactly what you need: One-time, never-missed error reporting. That's brilliant! (partly because now that I understand it, it seems obvious) – Joe May 19 '20 at 17:51
  • In fact, that function seems to be a perfect candidate for an extension method on IDataflowBlock – Joe May 19 '20 at 18:13
  • 1
    @Joe it is practically equivalent to having an event `Error` in the blocks. Like the event [`FileSystemWatcher.Error`](https://learn.microsoft.com/en-us/dotnet/api/system.io.filesystemwatcher.error). – Theodor Zoulias May 19 '20 at 18:17
  • 1
    @Joe you may find interesting this question too: [TPL DataFlow proper way to handle exceptions](https://stackoverflow.com/questions/58593202/tpl-dataflow-proper-way-to-handle-exceptions). It has some ideas about propagating exceptions in the reverse direction than `LinkTo`. – Theodor Zoulias May 19 '20 at 18:22