2

In a scenario where await may be called on an 'empty' list of tasks.

How do I await a list of Task<T>, and then add new tasks to the awaiting list until one fails or completes.

I am sure there is must be an Awaiter or CancellationTokenSource solution for this problem.

public class LinkerThingBob
{
    private List<Task> ofmyactions = new List<Task>();

    public void LinkTo<T>(BufferBlock<T> messages) where T : class
    {
        var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));

        // this would not actually work, because the WhenAny 
        // will not include subsequent actions.
        ofmyactions.Add(action.Completion);

        // link the new action block.
        this._inboundMessageBuffer.LinkTo(block);
    }

    // used to catch exceptions since these blocks typically don't end.
    public async Task CompletionAsync()
    {
        // how do i make the awaiting thread add a new action
        // to the list of waiting tasks without interrupting it
        // or graciously interrupting it to let it know there's one more

        // more importantly, this CompletionAsync might actually be called
        // before the first action is added to the list, so I actually need
        // WhenAny(INFINITE + ofmyactions)
        await Task.WhenAny(ofmyactions);
    }
}

My problem is that I need a mechanism where I can add each of the action instances created above to a Task<T> that will complete when there is an exception.

I am not sure how best to explain this but:

  • The task must not complete until at least one call to LinkTo<T> has been made, so I need to start with an infinite task

  • each time LinkTo<T> is called, the new action must be added to the list of tasks, which may already be awaited on in another thread.

Community
  • 1
  • 1
Jim
  • 14,952
  • 15
  • 80
  • 167
  • 1
    Any reason why you dont just add your messages to a ConcurrentQueue and have a number of worker tasks popping from them? This way you have complete control over all the tasks in whatever happens. Syncing ActionsBlocks seems uneccessary clutterly to me. – srandppl Dec 21 '15 at 14:02
  • I am using action blocks elsewhere and want to keep the pattern. Besides that there should be a way of coordinating multiple tasks in such a way that more can be added if necessary. Perhaps I have to build an add task callback / continuation. The problem is interesting nevertheless. – Jim Dec 21 '15 at 15:09
  • Can you clarify your question? – shay__ Dec 21 '15 at 15:32
  • I have edited the question with a more complete reproduction of the essentials of the problem. I hope it makes it clearer. – Jim Dec 21 '15 at 16:02
  • Creating an "infinite" task is very simple - just initialize a new task (for example `var t = new Task(()=> {})`) and don't start it... Why not initializing `ofmyactions` with such a task in it? – shay__ Dec 21 '15 at 16:09
  • @Jim: Did you mean `ofmyactions.Add(action.Completion);`? And do you want `CompletionAsync` to only complete if there's an exception? – Stephen Cleary Dec 21 '15 at 17:14
  • @shay Thanks for that it certainly helps I had not considered that simple solution to the infinite task - it doesn't solve the rest of the problem though. I need add a new action to the list of awaiting tasks which the other thread responds to. That's the essence of the problem. A flexi-continuation for lack of a better term – Jim Dec 21 '15 at 17:16
  • @stephen, yes you're correct. The problem is that WhenAny takes an array. It's not mutable and I can't think of how to change it correctly. It doesn't have to end only on exception but that is how it typically ends, either that or the service terminates and cancels a token (left out for brevity) – Jim Dec 21 '15 at 17:19

2 Answers2

2

There isn't anything built-in for this, but it's not too hard to build one using TaskCompletionSource<T>. TCS is the type to use when you want to await something and there isn't already a construct for it. (Custom awaiters are for more advanced scenarios).

In this case, something like this should suffice:

public class LinkerThingBob
{
  private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();

  private async Task ObserveAsync(Task task)
  {
    try
    {
      await task;
      _tcs.TrySetResult(null);
    }
    catch (Exception ex)
    {
      _tcs.TrySetException(ex);
    }
  }

  public void LinkTo<T>(BufferBlock<T> messages) where T : class
  {
    var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));

    var _ = ObserveAsync(action.Completion);

    this._inboundMessageBuffer.LinkTo(block);
  }

  public Task Completion { get { return _tcs.Task; } }
}

Completion starts in a non-completed state. Any number of blocks can be linked to it using ObserveAsync. As soon as one of the blocks completes, Completion also completes. I wrote ObserveAsync here in a way so that if the first completed block completes without error, then so will Completion; and if the first completed block completes with an exception, then Completion will complete with that same exception. Feel free to tweak for your specific needs. :)

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thanks this looks hundreds just one comment, you are discarding the result of ObserveAsync? That doesn't weaken the solution? – Jim Dec 22 '15 at 04:07
  • @Jim: That would usually be a mistake, but in this case any exceptions/results get reported through `Completion`, so we don't need the task returned from `ObserveAsync`. Another valid option is to make `ObserveAsync` be `async void`, but I prefer this pattern. – Stephen Cleary Dec 22 '15 at 14:55
0

This is a solution that uses exclusively tools of the TPL Dataflow library itself. You can create a TransformBlock that will "process" the ActionBlocks you want to observe. Processing a block means simply awaiting for its completion. So the TransformBlock takes incomplete blocks, and outputs the same blocks as completed. The TransformBlock must be configured with unlimited parallelism and capacity, and with ordering disabled, so that all blocks are observed concurrently, and each one that completes is returned instantly.

var allBlocks = new TransformBlock<ActionBlock<IMsg>, ActionBlock<IMsg>>(async block =>
{
    try { await block.Completion; }
    catch { }
    return block;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    EnsureOrdered = false
});

Then inside the LinkerThingBob.LinkTo method, send the created ActionBlocks to the TransformBlock.

var actionBlock = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
allBlocks.Post(actionBlock);

Now you need a target to receive the first faulted block. A WriteOnceBlock is quite suitable for this role, since it ensures that will receive at most one faulted block.

var firstFaulted = new WriteOnceBlock<ActionBlock<IMsg>>(x => x);

allBlocks.LinkTo(firstFaulted, block => block.Completion.IsFaulted);

Finally you can await at any place for the completion of the WriteOnceBlock. It will complete immediately after receiving a faulted block, or it may never complete if it never receives a faulted block.

await firstFaulted.Completion;

After the awaiting you can also get the faulted block if you want.

ActionBlock<IMsg> faultedBlock = firstFaulted.Receive();

The WriteOnceBlock is special on how it behaves when it forwards messages. Unlike most other blocks, you can call multiple times its Receive method, and you'll always get the same single item it contains (it is not removed from its buffer after the first Receive).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104