1

I have a design where we inject a RX Subject into a class that will call OnNext on the injected Subject (which is bad I know, lets not go there). This subject will need to call async/await code in the Subscribe method that will be called when the OnNext triggers. I know this is not cool so we need to do something like this Rx: subscribe with async function and ignore errors where we use Observable.FromAsync

But what I need to trap is if any exceptions occur, they should bubble up to the outer class that called OnNext (where the subject class was injected), and I need to deal with that and start something again, but wish to reuse the same original subject passed in. Essentially I DO NOT wish to to ever raise OnError/OnCompleted on the subject that is injected into the class that calls OnNext

If I use code like this, I get the semmantics I want

using System;
using System.Collections;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static CompositeDisposable disp = new CompositeDisposable();
        public static Subject<string> _subject = new Subject<string>();

        static void Main(string[] args)
        {

            //https://stackoverflow.com/questions/29427680/rx-subscribe-with-async-function-and-ignore-errors
            _subject
                 .Subscribe(x =>
                {
                    Console.WriteLine(x.ToString());
                    throw new Exception("bad juju");
                });


            var consumer = new Consumer(_subject);
            Task.Run(() =>
            {
                consumer.Consume(CancellationToken.None);

            });
     
            Console.ReadKey();
            disp.Dispose();

        }
    }

    public class Consumer
    {
        private readonly Subject<string> _subject;

        public Consumer(Subject<string> subject)
        {
            _subject = subject;
        }

        public void Consume(CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    _subject.OnNext(Guid.NewGuid().ToString("N"));
                }
            }
            catch (Exception e)
            {
                Console.WriteLine($"consumer saw {e.Message}");
                Consume(token);
            }
        }
    }
}

Where I get this sort of output (which is what I want)

consumer saw bad juju
7f5ef1971c6a421ea416110fa3473f5c
consumer saw bad juju
cdb63e6ef84144f7aac570e067566ad3
consumer saw bad juju
3947946907cc4be996adc2a7060a8962
consumer saw bad juju
bab899eb59ce4490918f1115eb0bde76
consumer saw bad juju
473e041924424906be0ccd9c624d90c4
consumer saw bad juju
8459956d2d5f494181a07526a2b9b9e2

But when I attempt to use the Observable.FromAsync where I introduce a random Exception, I only ever see my OnNext called once.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    
    class Program
    {

        static CompositeDisposable disp = new CompositeDisposable();
        public static Subject<string> _subject = new Subject<string>();

        private static int counter = 0;
        private static Random rnd = new Random();

        private static System.Threading.Tasks.Task<string> RunAsync()
        {
            return System.Threading.Tasks.Task.Factory.StartNew(() =>
            {
                System.Threading.Interlocked.Increment(ref counter);
                if (rnd.NextDouble() < 0.3)
                {
                    throw new Exception("bad juju");
                }
                return counter.ToString();
            });
        }


        static void Main(string[] args)
        {

            //https://stackoverflow.com/questions/29427680/rx-subscribe-with-async-function-and-ignore-errors
            _subject
                .SelectMany(x =>
                    Observable
                        .FromAsync(async () => await RunAsync())
                        )
                 .Subscribe(x =>
                {

                   //no op
                });


            var consumer = new Consumer(_subject);
            Task.Run(() =>
            {
                consumer.Consume(CancellationToken.None);

            });
            Console.ReadKey();
            disp.Dispose();

        }
    }



    public class Consumer
    {
        private readonly Subject<string> _subject;

        public Consumer(Subject<string> subject)
        {
            _subject = subject;
        }

        public void Consume(CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    _subject.OnNext(Guid.NewGuid().ToString("N"));
                }
            }
            catch (Exception e)
            {
                Console.WriteLine($"consumer saw {e.Message}");
                Consume(token);
            }
        }
    }
}

 

This is the result

1
consumer saw bad juju

Where the Exception is propagated up to the top level class that originally called the OnNext, but this only happens 1 time, no other calls are done, despite the fact that OnNext is called in a while loop.

However if I change my Subscribe, and jut make the Async/Await code synchronous I get the semmantics I want, what am I doing wrong, why for the Observable.FromAsync does it not carry on, and just stops responding to OnNext

I can get the semmantics I need by doing this in my Subscribe but this seems extremely whack, and is forcing me to make async code sync.

Its like the Exception is not being observed, but I can not see where

.Subscribe(x =>
{
    try
    {
        Console.WriteLine(x.ToString());
        RunAsync().ConfigureAwait(false).GetAwaiter().GetResult();

    }
    catch (Exception e)
    {
        throw;
    }
}

Any guidance on this would be appreciated

Progman
  • 16,827
  • 6
  • 33
  • 48
sacha barber
  • 2,214
  • 1
  • 24
  • 37
  • In the second case, where you use the `Observable.FromAsync` method, does the program crashes after the first exception? Or it continues running, but without outputting anything in the console? Btw be aware that the `Task.Run(() => {consumer...` in both of the examples creates a fire-and-forget task. You won't get any notification if the task fails. – Theodor Zoulias May 18 '21 at 19:43
  • When using the `Observable.FromAsync` it doesn't crash but simple stops outputting to the console, and it is like the Task is completed already or something, as it never raises the exception again. For the Fire-Forget Task.Run that you mention, yeah that is just demo grade code, real code doesn't do that – sacha barber May 19 '21 at 08:26
  • Your `.FromAsync(async () => await RunAsync())` call should be `.FromAsync(() => RunAsync())`. Does that help? – Enigmativity May 20 '21 at 03:08
  • Nah same issue, I think the real issue is that Exception is raised "in" the stream, which obviously causes the `OnError` to get called, which is not what I want. I need the exception from "inside" the stream to suddenly surface itself "outside" the stream, which is only really possible if the Exception gets raised in the Subscribe. Remember I don't want this Subject to EVER get `OnError` or `OnComplete` so based on that, all work has to be done in the Subscribe. But I know `async await` code in subscribe is anti pattern, and recommended approach is Select->Onservable.Async->Concat->Subscribe – sacha barber May 20 '21 at 08:19

0 Answers0