32

I am attempting to use TPL Dataflow to create a pipeline. All is working fine so far, with my pipeline defined as follows (although my issue is just with broadcaster, submissionSucceeded, submissionFailed):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

The issue I have is with the propagation of Exceptions. Because my BroadcastBlock propagates its completion (and therefore any Fault) to two blocks, if an exception does occur, it gets propagated to both blocks. Thus when I do

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

I end up with an aggregate exception containing two exceptions. Right now the best I can do is to filter these, i.e.:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

but I'm wondering if there's a better way to do this. i.e. if only one exception occurs, I only want one exception raised. I'm new to Dataflow, so just discovering all the conventions.

bornfromanegg
  • 2,826
  • 5
  • 24
  • 40
  • 1
    Should `submissionSucceeded` and `submissionFailed` be actually separate blocks? If you combined them into one that checks `state.PostSucceeded` inside, that would solve the issue. – svick Feb 03 '14 at 20:54
  • Yes, very possibly, in this situation. I might actually do that. But, more generally, this would happen any time a flow splits. For example, in the link below - if an exception is thrown in broadcaster, you'll end up with it duplicated in storage processor: http://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/ – bornfromanegg Feb 04 '14 at 11:48
  • There is a good [article about exceptions from MSDN](http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235570.aspx). – Alex Feb 17 '16 at 15:17

1 Answers1

1

I've written a TPL DataFlow example (https://github.com/squideyes/PodFetch) that takes a slightly different approach to completion and error handling. Here's the relevant code from Line's 171 to 201 of Program.cs:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

As you can see, I link a couple of TPL blocks together then get primed for handling completion using a HandleCompletion extension method:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

Very importantly, I call scraper.Complete() when I'm done passing in objects to the first block in the chain. With that, the HandleCompletion extension method then deals with continuation. And, since I'm waiting on fetcher (the last block in the chain to complete), it's easy to catch any resulting errors within a try/catch.

Louis S. Berman
  • 539
  • 3
  • 16
  • I believe that your HandleCompletion method does the same thing as PropagateCompletion does under the hood. Is there a particular reason you chose to do it this way? I don't think it solves my problem, although interestingly, it does highlight it, since it's clear to see how the exception can be duplicated by calling with multiple targets. – bornfromanegg Sep 19 '17 at 06:58