0

I want to use AsyncLocal to pass information through async workflows for tracing purpose. Now i faced a problem with RX.
Thios is my test code:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

Output is:

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 6): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 5): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

As you can see, the AsyncLocal.Value does not flow to the delayed subscribed methods.
=> AsyncValue gets lost on delayed track

As far as I understand, a normal Subscribe() uses no scheduler and a Delay() uses a scheduler.
When I use ObserveOn() for both calls the output for both are as following

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 7): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 9): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

=> AsyncValue gets lost on every track

Is there a way how to let the ExecutionContext flow with RX?
I only found this but here is the problem the otherway arround. They solved the issue how the context of observer flows. I want to flow the context of the publisher.

What i want to achieve is this:

  1. Message from "outside" comes to my service
  2. Distribute message within service (RX)
  3. When logging a message, format the log message with MessageId
  4. I do not want to pass the message everwhere

Thanks in advance for you answers.

JayJay424
  • 1
  • 1
  • The default scheduler used for `this.test.Subscribe(this.OnNextNormal)` is `Scheduler.Immediate`. As soon as you introduce any form of concurrency you immediately change scheduler. I suspect that's what's causing `AsyncLocal<>` not to work. You need to find another solution unless you just deal with the simplest Rx queries. – Enigmativity Jul 11 '18 at 12:48

1 Answers1

0

The free flowing execution context in Rx is what makes it great for most multi-threaded scenarios. You could enforce the thread context by getting around the scheduled methods like so:

public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

Output :

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

This does carry-forward the context, but it quickly gets complicated for larger queries. I'm not sure if implementing an IScheduler which preserves context when it notifies will work well. If message copying isn't too much overhead, that might be the best fit for Rx.

Asti
  • 12,447
  • 29
  • 38