22

I'm writing a complex Reactive Extensions query with lots of operators. How can I see what's going on?

I'm asking and answering this as it comes up a fair bit and is probably of good general use.

Andrii Litvinov
  • 12,402
  • 3
  • 52
  • 59
James World
  • 29,019
  • 9
  • 86
  • 120

2 Answers2

46

You can append this function liberally to your Rx operators while you are developing them to see what's happening:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        opName = opName ?? "IObservable";
        Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId);

        return Observable.Create<T>(obs =>
        {
            Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);

            try
            {
                var subscription = source
                    .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                opName,
                                                x,
                                                Thread.CurrentThread.ManagedThreadId),
                        ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                 opName,
                                                 ex,
                                                 Thread.CurrentThread.ManagedThreadId),
                        () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                 opName,
                                                 Thread.CurrentThread.ManagedThreadId)
                    )
                    .Subscribe(obs);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(() => Console.WriteLine(
                          "{0}: Cleaned up on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId)));
            }
            finally
            {
                Console.WriteLine("{0}: Subscription completed.", opName);
            }
        });
    }

Here's an example usage, shows a subtle behaviour difference of Range:

Observable.Range(0, 1).Spy("Range").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7

But this:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7

Spot the difference?

Obviously you can alter this to write to logs or to Debug, or use preprocessor directives to do a lean pass-through subscription on a Release build etc...

You can apply Spy throughout a chain of operators. e.g.:

Observable.Range(0,3).Spy("Range")
          .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();

Gives the output:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7

I'm sure you can find ways of enriching this to suit your purposes.

James World
  • 29,019
  • 9
  • 86
  • 120
  • 1
    This is a pretty nice solution if `Do(x => Console.WriteLine(...))` isn't enough. – Bryan Anderson Nov 26 '13 at 21:35
  • Thanks Bryan. I completely agree that Do(...) *is* enough most of the time. Quite a few of the stickier problems revolve around evaluation and subscription issues, which this can help with. It makes the full lifecycle of an Rx operator more apparent. – James World Nov 26 '13 at 22:00
  • 1
    I 'enriched' by putting the subscription in a field and then returning it wrapped in a CompositeDisposable with a Console.WriteLine either sad for logging unsubscribe. Nice. – Benjol Nov 27 '13 at 07:24
  • 1
    Nice! One thing to watch out for is that `Create` helpfully calls `Dispose()` on the `IDisposable` you return, *even if the the subscriber does not*. `Dispose` will be called when either the subscriber disposes or the observable terminates - whichever happens soonest. It does this so you can clean up resources as soon as possible - so just bear in mind it might not be the consequence of an unsubscribe. – James World Nov 27 '13 at 07:39
  • 1
    Nice answer. I also find that the Subscription and disposal (or lack of) are key elements in debugging an Rx query. This is especially important when you have selectmany clauses that you may assume have yeilded/completed. This kind of logging/spying removes guess work and assumptions being made. – Lee Campbell Nov 27 '13 at 21:05
  • What would be really great is a Visualizer of queries. Some how produce WebSequenceDiagrams from log output. Anyone up for the challenge? – Lee Campbell Nov 27 '13 at 21:06
  • I'll be in touch... :) – James World Nov 28 '13 at 07:43
  • 1
    FYI [Rxx](http://rxx.codeplex.com/) has a number of `Trace` extensions that do this sort of thing – Brandon Jan 22 '14 at 13:57
  • 3
    Came back again to say this extension method is still regularly 'saving my life' :) – Benjol Sep 13 '16 at 07:15
  • 1
    Yet another case for micropayments. – Sentinel Jan 25 '18 at 10:25
8

Another three years have passed, and I'm still using your idea. My version has now evolved as follows:

  • Overload for choice of logging destination
  • Log number of subscriptions
  • Log 'downstream' exceptions from bad subscribers.

The code:

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
    return Spy(source, opName, Console.WriteLine);
}

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName, 
                                                              Action<string> logger)
{
    opName = opName ?? "IObservable";
    logger($"{opName}: Observable obtained on Thread: {Thread.CurrentThread.ManagedThreadId}");

    var count = 0;
    return Observable.Create<T>(obs =>
    {
        logger($"{opName}: Subscribed to on Thread: {Thread.CurrentThread.ManagedThreadId}");
        try
        {
            var subscription = source
                .Do(x => logger($"{opName}: OnNext({x}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    ex => logger($"{opName}: OnError({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    () => logger($"{opName}: OnCompleted() on Thread: {Thread.CurrentThread.ManagedThreadId}")
                )
                .Subscribe(t =>
                {
                    try
                    {
                        obs.OnNext(t);
                    }
                    catch(Exception ex)
                    {
                        logger($"{opName}: Downstream exception ({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}");
                        throw;
                    }
                }, obs.OnError, obs.OnCompleted);

            return new CompositeDisposable(
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) on Thread: {Thread.CurrentThread.ManagedThreadId}")),
                    subscription,
                    Disposable.Create(() => Interlocked.Decrement(ref count)),
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) completed, {count} subscriptions"))
                );
        }
        finally
        {
            Interlocked.Increment(ref count);
            logger($"{opName}: Subscription completed, {count} subscriptions.");
        }
    });
}
Benjol
  • 63,995
  • 54
  • 186
  • 268