I have a design where we inject a RX Subject into a class that will call OnNext on the injected Subject (which is bad I know, lets not go there). This subject will need to call async/await code in the Subscribe method that will be called when the OnNext triggers. I know this is not cool so we need to do something like this Rx: subscribe with async function and ignore errors where we use Observable.FromAsync
But what I need to trap is if any exceptions occur, they should bubble up to the outer class that called OnNext (where the subject class was injected), and I need to deal with that and start something again, but wish to reuse the same original subject passed in. Essentially I DO NOT wish to to ever raise OnError/OnCompleted on the subject that is injected into the class that calls OnNext
If I use code like this, I get the semmantics I want
using System;
using System.Collections;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp3
{
class Program
{
static CompositeDisposable disp = new CompositeDisposable();
public static Subject<string> _subject = new Subject<string>();
static void Main(string[] args)
{
//https://stackoverflow.com/questions/29427680/rx-subscribe-with-async-function-and-ignore-errors
_subject
.Subscribe(x =>
{
Console.WriteLine(x.ToString());
throw new Exception("bad juju");
});
var consumer = new Consumer(_subject);
Task.Run(() =>
{
consumer.Consume(CancellationToken.None);
});
Console.ReadKey();
disp.Dispose();
}
}
public class Consumer
{
private readonly Subject<string> _subject;
public Consumer(Subject<string> subject)
{
_subject = subject;
}
public void Consume(CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
_subject.OnNext(Guid.NewGuid().ToString("N"));
}
}
catch (Exception e)
{
Console.WriteLine($"consumer saw {e.Message}");
Consume(token);
}
}
}
}
Where I get this sort of output (which is what I want)
consumer saw bad juju
7f5ef1971c6a421ea416110fa3473f5c
consumer saw bad juju
cdb63e6ef84144f7aac570e067566ad3
consumer saw bad juju
3947946907cc4be996adc2a7060a8962
consumer saw bad juju
bab899eb59ce4490918f1115eb0bde76
consumer saw bad juju
473e041924424906be0ccd9c624d90c4
consumer saw bad juju
8459956d2d5f494181a07526a2b9b9e2
But when I attempt to use the Observable.FromAsync
where I introduce a random Exception, I only ever see my OnNext
called once.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp3
{
class Program
{
static CompositeDisposable disp = new CompositeDisposable();
public static Subject<string> _subject = new Subject<string>();
private static int counter = 0;
private static Random rnd = new Random();
private static System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception("bad juju");
}
return counter.ToString();
});
}
static void Main(string[] args)
{
//https://stackoverflow.com/questions/29427680/rx-subscribe-with-async-function-and-ignore-errors
_subject
.SelectMany(x =>
Observable
.FromAsync(async () => await RunAsync())
)
.Subscribe(x =>
{
//no op
});
var consumer = new Consumer(_subject);
Task.Run(() =>
{
consumer.Consume(CancellationToken.None);
});
Console.ReadKey();
disp.Dispose();
}
}
public class Consumer
{
private readonly Subject<string> _subject;
public Consumer(Subject<string> subject)
{
_subject = subject;
}
public void Consume(CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
_subject.OnNext(Guid.NewGuid().ToString("N"));
}
}
catch (Exception e)
{
Console.WriteLine($"consumer saw {e.Message}");
Consume(token);
}
}
}
}
This is the result
1
consumer saw bad juju
Where the Exception is propagated up to the top level class that originally called the OnNext
, but this only happens 1 time, no other calls are done, despite the fact that OnNext
is called in a while loop.
However if I change my Subscribe, and jut make the Async/Await code synchronous I get the semmantics I want, what am I doing wrong, why for the Observable.FromAsync
does it not carry on, and just stops responding to OnNext
I can get the semmantics I need by doing this in my Subscribe
but this seems extremely whack, and is forcing me to make async code sync.
Its like the Exception is not being observed, but I can not see where
.Subscribe(x =>
{
try
{
Console.WriteLine(x.ToString());
RunAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception e)
{
throw;
}
}
Any guidance on this would be appreciated