1

This sample console application has 2 observables. The first one pushes numbers from 1 to 100. This observable is subscribed by the AsyncClass which runs a long running process for each number it gets. Upon completion of this new async process I want to be able to 'push' to 2 subscribers which would be doing something with this new value.

My attempts are commented in the source code below.

AsyncClass:

class AsyncClass
    {
        private readonly IConnectableObservable<int> _source;
        private readonly IDisposable _sourceDisposeObj;
        public IObservable<string> _asyncOpObservable;

        public AsyncClass(IConnectableObservable<int> source)
        {
            _source           = source;
            _sourceDisposeObj = _source.Subscribe(
                                    ProcessArguments,
                                    ExceptionHandler,
                                    Completed
                                );
            _source.Connect();


        }

        private void Completed()
        {
            Console.WriteLine("Completed");
            Console.ReadKey();
        }

        private void ExceptionHandler(Exception exp)
        {
            throw exp;
        }

        private void ProcessArguments(int evtArgs)
        {
            Console.WriteLine("Argument being processed with value: " + evtArgs);

            //_asyncOpObservable = LongRunningOperationAsync("hello").Publish(); 
            // not going to work either since this creates a new observable for each value from main observer

        }
        // http://rxwiki.wikidot.com/101samples
        public IObservable<string> LongRunningOperationAsync(string param)
        {
            // should not be creating an observable here, rather 'pushing' values?
            return Observable.Create<string>(
                o => Observable.ToAsync<string, string>(DoLongRunningOperation)(param).Subscribe(o)
            );
        }

        private string DoLongRunningOperation(string arg)
        {
            return "Hello";
        }
    }

Main:

static void Main(string[] args)
        {
            var source = Observable
                            .Range(1, 100)
                            .Publish();

            var asyncObj = new AsyncClass(source);
            var _asyncTaskSource = asyncObj._asyncOpObservable;

            var ui1 = new UI1(_asyncTaskSource);
            var ui2 = new UI2(_asyncTaskSource);
        }

UI1 (and UI2, they're basically the same):

 class UI1
    {
        private IConnectableObservable<string> _asyncTaskSource;
        private IDisposable _taskSourceDisposable;

        public UI1(IConnectableObservable<string> asyncTaskSource)
        {
            _asyncTaskSource = asyncTaskSource;
            _asyncTaskSource.Connect();
            _taskSourceDisposable = _asyncTaskSource.Subscribe(RefreshUI, HandleException, Completed);
        }

        private void Completed()
        {
            Console.WriteLine("UI1: Stream completed");
        }

        private void HandleException(Exception obj)
        {
            Console.WriteLine("Exception! "+obj.Message);
        }

        private void RefreshUI(string obj)
        {
            Console.WriteLine("UI1: UI refreshing with value "+obj);
        }
    }

This is my first project with Rx so let me know if I should be thinking differently. Any help would be highly appreciated!

javaCity
  • 4,288
  • 2
  • 25
  • 37

1 Answers1

1

I'm going to let you know you should be thinking differently... :) Flippancy aside, this looks like a case of bad collision between object-oriented and functional-reactive styles.

It's not clear what the requirements are around timing of the data flow and caching of results here - the use of Publish and IConnectableObservable is a little confused. I'm going to guess you want to avoid the 2 downstream subscriptions causing the processing of a value being duplicated? I'm basing some of my answer on that premise. The use of Publish() can achieve this by allowing multiple subscribers to share a subscription to a single source.

Idiomatic Rx wants you to try and keep to a functional style. In order to do this, you want to present the long running work as a function. So let's say, instead of trying to wire your AsyncClass logic directly into the Rx chain as a class, you could present it as a function like this contrived example:

async Task<int> ProcessArgument(int argument)
{
    // perform your lengthy calculation - maybe in an OO style,
    // maybe creating class instances and invoking methods etc.
    await Task.Delay(TimeSpan.FromSeconds(1));
    return argument + 1;
}

Now, you can construct a complete Rx observable chain calling this function, and through the use of Publish().RefCount() you can avoid multiple subscribers causing duplicate effort. Note how this separates concerns too - the code processing the value is simpler because the reuse is handled elsewhere.

var query = source.SelectMany(x => ProcessArgument(x).ToObservable())
                  .Publish().RefCount();

By creating a single chain for subscribers, the work is only started when necessary on subscription. I've used Publish().RefCount() - but if you want to ensure values aren't missed by the second and subsequent subscribers, you could use Replay (easy) or use Publish() and then Connect - but you'll want the Connect logic outside the individual subscriber's code because you just need to call it once when all subscribers have subscribed.

James World
  • 29,019
  • 9
  • 86
  • 120
  • Thanks for your answer James, it helped me see what I was doing wrong. Incase you are still wondering, the `Observable.Range` is just a dummy observable for partially continuous event like 'file save'. I am trying to minify some test files when I get a file saved event for any test files. After that it should notify 2 UI classes that the file has been saved along with the contents of the file. The code I showed was a contrived example. – javaCity Aug 22 '14 at 14:51
  • Just noticed that `SelectMany` accumulates all the values before sending it down the pipeline. I would like to send the data as it arrives. Any suggestion? – javaCity Aug 22 '14 at 15:13
  • `SelectMany` certainly doesn't do this, something else must be responsible... hard to say what without seeing your code, but check your assumptions. – James World Aug 22 '14 at 15:16
  • 1
    This might help with debugging! http://stackoverflow.com/questions/20220755/how-can-i-see-what-my-reactive-extensions-query-is-doing – James World Aug 22 '14 at 15:18
  • I feel stupid now, you're correct. The debugging script is awesome. Thank you for posting it here :) – javaCity Aug 22 '14 at 16:03