8

As Stephen Toub explained in this post, when you submit a message to an ActionBlock, you can ExecutionContext.Capture before calling ActionBlock.Post, pass a DTO holding both message and ExecutionContext into the block, then inside the message handling delegate use ExecutionContext.Run to run the delegate on the captured context:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

This works well when the logic inside the message handler is synchronous. But how can I run a piece of async logic on the captured ExecutionContext? I need something like this:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

Obviously, this doesn't compile because ExecutionContext.Run does not support asynchronous delegates (while ActionBlock does) - so how can I do this?

Andriy Volkov
  • 18,653
  • 9
  • 68
  • 83
  • The [link](https://qa.social.msdn.microsoft.com/Forums/en-US/5e88aa47-72b1-4d85-a1e8-7a403be8795a/best-way-to-pass-callcontext-in-dataflow-blocks?forum=async) provided refers to a pre-release version of TPL Dataflow, which is not compatible with the current API (for example method `DeclinePermanently`, property `CompletionTask`). So the information provided my not be accurate. My experiments indicate that the `ExecutionContext` is captured by default, and the `ContextFlowProcessor` behaves the same as a simple `ActionBlock`. Do you have an example that shows a difference? – Theodor Zoulias Aug 15 '19 at 06:28
  • Not really, no. I looked at the current source code and it only captures the ExecutionContext (implicitly) when it starts a new task, which happens when you post the first message, or the first after a long gap, but not for subsequent messages queued to the running task-loop. – Andriy Volkov Aug 26 '19 at 19:28
  • As explained [here](https://devblogs.microsoft.com/pfxteam/executioncontext-vs-synchronizationcontext/) the `async/await` keywords are using `ExecutionContext` behind the scenes.The `async/await` is just some infrastructure that help simulate synchronous semantics in asynchronous programing. so when you are using `ExecutionContext` it means you gonna handle things manually. and i think it doesn't make sense to `ExecutionContext.Run` support `async/await` while they are themselves based on `ExecutionContext.Run` and they are using it – Ali Zeinali Aug 26 '19 at 21:49
  • 1
    What about using Task.Run? – Victor P Sep 02 '19 at 00:53

1 Answers1

2

If you can provide a self-contained example so we could try to repro the problem, we might be able to provide a better answer. That said, it's possible to manually control the flow of ExecutionContext (or rather, a copy of it) across await continuations, using a simple custom synchronization context. Here is an example (warning - almost untested!):

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

Note, you should only store immutable values using CallContext.LogicalSetData (or AsyncLocal<T>). I.e, if you need to store something that may change during an asynchronous flow from a callee to the caller, and be able to track that change in the caller, make it a property of a class and then store an instance of that class. Make sure that class is thread-safe as well, because eventually you can have many concurrent forks of the original execution contexts.

For more details, refer to Stephen Cleary's excellent Implicit Async Context ("AsyncLocal") and "Eliding Async and Await".

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • so basically, Task.Run with some extra details? – Andriy Volkov Sep 03 '19 at 21:12
  • @zvolkov, no it doesn't use `Task.Run` at all. I just made `EcFlowingSynchronizationContext.Run` to look similar for convenience. – noseratio Sep 03 '19 at 21:30
  • I mean, your sc.Run uses Task.Factory.StartNew to do the part I was asking about (run the async code), and the way it runs it on the target EC, is by getting a taskscheduler FromCurrentSynchronizationContext from inside my target EC.Run - but I wonder if I could just directly use Task.Factory.StartNew inside my target EC.Run, without creating this extra class? – Andriy Volkov Sep 03 '19 at 21:35
  • @zvolkov what `TaskScheduler` are you going to provide if you call `Task.Factory.StartNew` like that? If you give it `TaskScheduler.Default`, you'll get the default EC propagation behavior, which you were not happy with in the first place. The whole deal with custom sync context and custom task scheduler was to alter how it gets propagated. Does this implementation actually work as you wanted it? Or am I missing something? – noseratio Sep 03 '19 at 21:47
  • No, just trying to understand the underlying principle. I will need to test this. – Andriy Volkov Sep 03 '19 at 21:51
  • @zvolkov, actually I believe the default EC propagation behavior should be suitable enough, I don't think it needs customization. Check the last paragraph in my answer, maybe that's where you experience a problem in your code. – noseratio Sep 03 '19 at 21:57
  • I don't see how the last paragraph is relevant and I did read both of those posts before. I'm trying to flow the context alongside the message I post to the ActionBlock and as I said, AB only flows EC implicitly, when starting new task for the message loop, which doesn't happen on every message. – Andriy Volkov Sep 03 '19 at 22:50
  • Anyway, I think the key point of your answer is running the task on that special scheduler, which will somehow get the target EC flow to my task, correct? – Andriy Volkov Sep 03 '19 at 22:56
  • @zvolkov correct but there's one thing. If you use a custom task scheduler *without* a custom synchronization, the `await` continuations will only be scheduled by your custom scheduler if it's recognized as currently active, "ambient" scheduler, see [here](https://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/Task.cs,2991). It's very easy for the inner calls to set their own task scheduler, e.g., `Task.Run` uses `TaskScheduler.Default`. So you can't rely the scheduler will stay the same inside out. – noseratio Sep 04 '19 at 01:19
  • Then again, `await task.ConfigureAwait(false)` will get you off the current synchronization context, either. So really, there is no guarantee that this EC propagation logic will apply to all `await` continuations inside your async lambda, unless perhaps you wrap each of them with `EcFlowingSynchronizationContext.Run`, too. – noseratio Sep 04 '19 at 01:19