16

How to compare the following two? Is Rx more powerful?

Reactive extension:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    }
});

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());

Async streams:

public async void Run(string path)
{
    await foreach (var line in TestAsync())
    {
        Console.WriteLine(line);
    }
}

private async IAsyncEnumerable<string> TestAsync()
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    }
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • 7
    The most notable difference is that async enumerable is still pull based, while Rx provides push based notifications. In other words Rx adds a dimension, time. If your data is already there to be fetched, pull is sufficient. If not, declarative push based programming usually results in cleaner code. Here's a [blog](https://www.infoq.com/articles/Async-Streams/) that goes in more detail. – Funk Oct 20 '19 at 09:45
  • Rx is starting to use Async streams. Async streams work at a lower level – Panagiotis Kanavos Oct 21 '19 at 07:58
  • I've blogged on a related topic: [C# events as asynchronous streams with ReactiveX or Channels](https://dev.to/noseratio/c-events-as-asynchronous-streams-with-reactivex-or-channels-82k). – noseratio Jul 17 '20 at 12:17

1 Answers1

15

The two features work together. PS: Forget about async streams, think about await foreach.

Async streams

Async streams are a (relatively) low level feature that allows asynchronous iteration. By itself, it doesn't offer any other capabilities like filtering, aggregation etc. It's pull based while Rx is push based.

You can use LINQ operators on an async stream through the System.Linq.Async library found in ..... the ReacticeX.NET Github repo. It's fast, but doesn't offer the event processing functionality of Rx.

There's no notion of time time for example, much less a way to use a custom scheduler. There are no subscriptions, no error events. GroupBy will consume the entire source and emit group items as separate IAsyncEnumerable instances, while Rx's GroupBy will emit separate Observables for each group.

In the question's example, IAsyncEnumerable is a natural fit since there's no event logic involved, just iterating over an asynchronous iterator.

If the example tried to poll eg a remote service and detect failure spikes (ie more failures per interval than a threshold) IAsyncEnumerable would be inappropriate as it would block waiting for all responses. In fact, we could't aggregate events per time at all.

Threading

None really - an IAsyncEnumerable or await foreach call don't specify how events are produced or consumed. If we want to use a separate task to process an item, we have to create it ourselves, eg :

public async Task Run(string path)
{
    await foreach (var line in LoadStockTrades())
    {
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"{result} : {line});
    }
}

Reactive Extensions

Reactive Extensions is a high level library that deals with event streams. It's push based, it understands time, but it's also slower than lower-level constructs like Async Streams or Channels.

In the question's example, Rx would be overkill. Polling and detecting spikes though is easy, with multiple windowing options.

System.Linq.Async can create an Observable from an IAsyncEnumerable with ToObservable, which means an IAsyncEnumerable can be used as a source for Rx.

Threading

By default, Rx is single threaded, which makes perfect sense for its main scenario - event stream processing.

On the other hand, Rx allows the publisher, subscriber and operators to run on the same or separate threads. In languages that don't have async/await or DataFlow (eg Java,JavaScript), Rx is used to emulate concurrent processing pipelines by running the publisher and subscribers on different threads.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • I wanted to use Rx for a while, but these problems can be resolved by non-RX easily too. It seems not a lot of use cases which Rx doesn't overkill in the usual business applications. – ca9163d9 Oct 23 '19 at 15:01
  • 2
    @ca9163d9 Rx is extremely useful in its core scenario - event stream processing. In other languages like Java, JavaScript, it's used to emulate dataflow and async processing, but C# already has better constructs for this. Do you want to count the number or requests or failures? You can use `Window`, to collect events in a window, `Count()` and `Where()` to retrieve notifications only when the count of failures exceeds a limit. That takes a *lot* of coding without Rx. – Panagiotis Kanavos Oct 24 '19 at 06:58
  • 1
    @ca9163d9 Mobile fraud detection? One example from Azure Event Analytics is detecting calls from too distant base stations in an interval - Group by SIM, Window to batch entries, Aggregate to calculate distances between call events, Where to filter for calls outside a threshold – Panagiotis Kanavos Oct 24 '19 at 07:00
  • Million extra points for the PS "await foreach". The main gripe I have with 'async streams' is the confusing terminology that builds on the confusion of "Enumerable". Observables are enumerable too, just that Enumerable in .NET should actually be called "Iterable". In other words , get something one by one. In my opinion though the idea of an asynchronous 'get me the next thing that appears' is pretty much the same thing as 'do something whenever something appears on my observable'. As ever the architects propagate their own conceptual confusion into the language model and then to developers. – Frank Jul 21 '21 at 10:55
  • 1
    Oh an Rx is absolutely awesome stuff. I'd say that most developer work would be best done using Observables, if only the rest of the developer community and literature felt the same way. – Frank Jul 21 '21 at 10:56
  • @Frank there's a reason they don't. Rx is a very specialized tool, meant to address a very specific problem - event stream processing. While it can be used for other concurrency problems, it's a *lot* harder to use and requires a lot of fiddling to avoid deadlocks. It's *definitely* unsuitable for data parallelism. While you can use it for async operations, you have to manually configure threading because by default a single thread is used for the entire pipeline - you don't need more to process a single stream. And even for CSP/pipeline architectures, it's too specialized – Panagiotis Kanavos Jul 21 '21 at 11:44
  • 1
    @Frank in [this job queue benchmark](https://michaelscodingspot.com/performance-of-producer-consumer/) Rx was at *least* two times slower to execute and almost 5 times slower to initialize than Channels, Dataflow or a simple threadpool thread. Which, to be fair, is a slight abuse of *all* those classes. And Rx can't be used for complex meshes or agent-like architectures, where blocks receive and post messages to multiple blocks, not just the next block down the line. – Panagiotis Kanavos Jul 21 '21 at 11:47
  • 1
    @Frank there are a *lot* of different parallel/async/concurrent computing paradigms. Some problems are easier to solve using one paradigm, some are easier using another. Rx is a very specialized tool for a very specific category of problems. It's like a Torx screwdriver - great for high-torque screwing, useless for common screws or hammering – Panagiotis Kanavos Jul 21 '21 at 11:51
  • I am aware of all of that. Cant say i agree with you much though. – Frank Jul 22 '21 at 11:15