2

I have a TPL Dataflow Action Block that I'm using to receive trigger messages for a camera, then do some processing. If the processing task throws an exception, the ActionBlock enters the faulted state. I would like to send a faulted message to my UI and send a reset message to the ActionBlock so it can continue processing incoming trigger messages. Is there a way to return the ActionBlock to a ready state (clear the fault)?

Code for the curious:

using System.Threading.Tasks.Dataflow;

namespace Anonymous
{
    /// <summary>
    /// Provides a messaging system between objects that inherit from Actor
    /// </summary>
    public abstract class Actor
    {
        //The Actor uses an ActionBlock from the DataFlow library.  An ActionBlock has an input queue you can 
        // post messages to and an action that will be invoked for each received message.

        //The ActionBlock handles all of the threading issues internally so that we don't need to deal with 
        // threads or tasks. Thread-safety comes from the fact that ActionBlocks are serialized by default. 
        // If you send two messages to it at the same time it will buffer the second message until the first 
        // has been processed.
        private ActionBlock<Message> _action;

        ...Properties omitted for brevity...

        public Actor(string name, int id)
        {
            _name = name;
            _id = id;
            CreateActionBlock();
        }

        private void CreateActionBlock()
        {
            // We create an action that will convert the actor and the message to dynamic objects 
            // and then call the HandleMessage method.  This means that the runtime will look up 
            // a method called ‘HandleMessage’ with a parameter of the message type and call it.

            // in TPL Dataflow if an exception goes unhandled during the processing of a message, 
            // (HandleMessage) the exception will fault the block’s Completion task.

            //Dynamic objects expose members such as properties and methods at run time, instead 
            // of at compile time. This enables you to create objects to work with structures that 
            // do not match a static type or format. 
            _action = new ActionBlock<Message>(message =>
            {
                dynamic self = this;
                dynamic msg = message;
                self.HandleMessage(msg); //implement HandleMessage in the derived class
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1  // This specifies a maximum degree of parallelism of 1.
                                            // This causes the dataflow block to process messages serially.
            });
        }

        /// <summary>
        /// Send a message to an internal ActionBlock for processing
        /// </summary>
        /// <param name="message"></param>
        public async void SendMessage(Message message)
        {
            if (message.Source == null)
                throw new Exception("Message source cannot be null.");
            try
            {
                _action.Post(message);
                await _action.Completion;
                message = null;
                //in TPL Dataflow if an exception goes unhandled during the processing of a message, 
                // the exception will fault the block’s Completion task.
            }
            catch(Exception ex)
            {
                _action.Completion.Dispose();
                //throw new Exception("ActionBlock for " + _name + " failed.", ex);
                Trace.WriteLine("ActionBlock for " + _name + " failed." + ExceptionExtensions.GetFullMessage(ex));

                if (_action.Completion.IsFaulted)
                {
                    _isFaulted = true;
                    _faultReason = _name + " ActionBlock encountered an exception while processing task: " + ex.ToString();
                    FaultMessage msg = new FaultMessage { Source = _name, FaultReason = _faultReason, IsFaulted = _isFaulted };
                    OnFaulted(msg);
                    CreateActionBlock();
                }
            }
        }

        public event EventHandler<FaultMessageEventArgs> Faulted;
        public void OnFaulted(FaultMessage message)
        {
            Faulted?.Invoke(this, new FaultMessageEventArgs { Message = message.Copy() });
            message = null;
        }

        /// <summary>
        /// Use to await the message processing result
        /// </summary>
        public Task Completion
        {
            get
            {
                _action.Complete();
                return _action.Completion;
            }
        }
    }
}
HappyDude
  • 53
  • 9
  • Short answer: No. An `ActionBlock` cannot be restarted or reset. – JSteward Apr 04 '18 at 14:12
  • Possible duplicate of [Task Dataflow, can a data block be changed from completion state?](https://stackoverflow.com/questions/15967903/task-dataflow-can-a-data-block-be-changed-from-completion-state) – JSteward Apr 04 '18 at 14:37
  • I looked at the other question. My "Device" object has an ActionBlock. Does this mean I need to kill the ActionBlock and create a new one? Is it as simple as saying _action = new ActionBlock() or will that have side-effects with respect to memory? I put some example code with a CreateActionBlock() method in my post above. – HappyDude Apr 04 '18 at 18:58
  • Yes you will need to recreate the block, dispose of the old link into your pipeline and then link the new block with your pipeline. Often it's easier just to bring the whole pipeline down and back up. – JSteward Apr 04 '18 at 19:00
  • @HappyDude *don't* allow unhandled faults in an ActionBlock, just as you wouldn't allow unhanlded exceptions in an application. What do you want to do once a fault occurs? In the simplest case you could log it. Or you could convert your ActionBlock to a *TransformBlock* that sends Success/Failure messages downstream. You can add a predicate to `LinkTo` that routes failure messages to logging/error handling blocks for example, Success messages to other blocks – Panagiotis Kanavos Apr 05 '18 at 09:51
  • @HappyDude an ActionBlock isn't an Actor btw. They may look similar but refer to *different* paradigms. You can use one to build the other but in the end, it's easier to implement retrying in actors than it is in dataflows. It's easier to build meshes with throttling in DataFlow than it is with actors. – Panagiotis Kanavos Apr 05 '18 at 09:53
  • @HappyDude besides the code you wrote essentially emulates the ActionBlock's own API with extra serious bugs - `async void` is only meant for *event handlers*. It can't be awaited. `ActionBlock.SendAsync` does what you want already *and* handles throttling. Your code can only handle *one* message because it kills the Actionblock. Why? You could replace *all* of this code with a `var block=new ActionBlock(msg=>DoSomething(msg)); /*SendAsync 1000 messages;*/ block.Complete(); await block.Completion;` – Panagiotis Kanavos Apr 05 '18 at 09:57

1 Answers1

2

An unhandled exception in an ActionBlock is like an unhandled exception in an application. Don't do this. Handle the exception appropriately.

In the simplest case, log it or do something inside the block's delegate. In more complex scenarios you can use a TransformBlock instead of an ActionBlock and send Succes or Failure messages to downstream blocks.

The code you posted though has some critical issues. Dataflow blocks aren't agents and agents aren't dataflow blocks. You can use the one to build the other of course, but they represent different paradigms. In this case your Actor emulates ActionBlock's own API with several bugs.

For example, you don't need to create a SendAsync, blocks already have one. You should not complete the block once you send a message. You won't be able to handle any other messages. Only call Complete() when you really don't want to use the ActionBlock any more. You don't need to set a DOP of 1, that's the default value.

You can set bounds to a DataflowBlock so that it accepts only eg 10 messages at a time. Otherwise all messages would be buffered until the block found the chance to process them

You could replace all of this code with the following :

void MyMethod(MyMessage message)
{
    try
    {
    //...
    }
    catch(Exception exc)
    {
        //ToString logs the *complete exception, no need for anything more
        _log.Error(exc.ToString());
    }
}

var blockOptions new ExecutionDataflowBlockOptions {
                                 BoundedCapacity=10,
                                 NameFormat="Block for MyMessage {0} {1}"
};
var block=new ActionBlock<MyMessage>(MyMethod,blockOptions);

for(int i=0;i<10000;i++)
{
    //Will await if more than 10 messages are waiting
    await block.SendAsync(new MyMessage(i);
}
block.Complete();
//Await until all leftover messages are processed
await block.Completion;

Notice the call to Exception.ToString(). This will generate a string containing all exception information, including the call stack.

NameFormat allows you to specify a name template for a block that can be filled by the runtime with the block's internal name and task ID.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • My understanding of bounded capacity is that any further messages that get sent to the actionblock will be dropped. Will this fault the block? – HappyDude Apr 05 '18 at 18:41
  • @HappyDude no, they won't be dropped if you use `SendAsync`. SendAsync will await until the input buffer frees up a slot. Blocks themselves use `SendAsync` so you can have a pipeline with eg 4 TransformBlocks ending with 1 ActionBlock and ensure that you don't flood one of the buffers if one of the blocks is too slow by setting BoundedCapacity on all blocks. – Panagiotis Kanavos Apr 16 '18 at 08:04