-1

Given the following method:

If I leave the hack in place, my unit test completes immediately with "observable has no data".

If I take the hack out, there are multiple threads all attempting to login at the same time.
The host service does not allow this.

How do I ensure that only one thread is producing observables at any given point in time.

    private static object obj = new object();
    private static bool here = true;
    public IObservable<Party> LoadAllParties(CancellationToken token)
    {
        var parties = Observable.Create<Party>(
            async (observer, cancel) =>
            {
                // this is just a hack to test behavior
                lock (obj)
                {
                    if (!here)
                        return;
                    here = false;
                }
                // end of hack.
                try
                {
                    if (!await this.RequestLogin(observer, cancel))
                        return;

                    // request list.
                    await this._request.GetAsync(this._configuration.Url.RequestList);
                    if (this.IsCancelled(observer, cancel))
                        return;

                    while (!cancel.IsCancellationRequested)
                    {
                        var entities = await this._request.GetAsync(this._configuration.Url.ProcessList);
                        if (this.IsCancelled(observer, cancel))
                            return;

                        var tranche = this.ExtractParties(entities);

                        // break out if it's the last page.
                        if (!tranche.Any())
                            break;

                        Array.ForEach(tranche, observer.OnNext);

                        await this._request.GetAsync(this._configuration.Url.ProceedList);
                        if (this.IsCancelled(observer, cancel))
                            return;
                    }

                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
            });
        return parties;
    }

My Unit Test:

var sut = container.Resolve<SyncDataManager>();
var count = 0;
var token = new CancellationTokenSource();
var observable = sut.LoadAllParties(token.Token);
observable.Subscribe(party => count++);
await observable.ToTask(token.Token);
count.Should().BeGreaterThan(0);
Jim
  • 14,952
  • 15
  • 80
  • 167
  • 1
    I see an `Observable.Create` call, a lock, and a whole bunch of unexplained, uncompilable custom code. If you could produce a [mcve], that may be helpful. – Shlomo Sep 14 '17 at 13:31
  • I can try and make a smaller simpler example - but the problem remains that the the lock is being hit by several threads. Why? It's clearly how observable works... I need to make it non reentrant. I'll work on simplifying the example. You can just replace each Async call with await Task.Delay(1000) – Jim Sep 14 '17 at 13:44
  • Your client code is probably causing that. What are you doing with the created observable? – Shlomo Sep 14 '17 at 15:19
  • It seems to me that creating an `EventLoopScheduler` might be the way to go here. Then you can ensure that every interaction with the service is on the one thread. Just use the appropriate `.ObserveOn`, `.SubscribeOn`, and `.NotifyOn` calls and you should be golden. It'd simplify things down quite a bit. – Enigmativity Sep 19 '17 at 02:27

1 Answers1

4

I do think your question is suffering from the XY Problem - the code contains several calls to methods not included which may contain important side effects and I feel that going on the information available won't lead to the best advice.

That said, I suspect you did not intend to subscribe to observable twice - once with the explicit Subscribe call, and once with the ToTask() call. This would certainly explain the concurrent calls, which are occurring in two different subscriptions.

EDIT:

How about asserting on the length instead (tweak the timeout to suit):

var length = await observable.Count().Timeout(TimeSpan.FromSeconds(3));

Better would be to look into Rx-Testing and mock your dependencies. That's a big topic, but this long blog post from the Rx team explains it very well and this answer regarding TPL-Rx interplay may help: Executing TPL code in a reactive pipeline and controlling execution via test scheduler

James World
  • 29,019
  • 9
  • 86
  • 120
  • Hah - James! I am pretty sure you've got it. I didn't realize that ToTask would do that. Any suggestion on how to wait for it to finish? – Jim Sep 14 '17 at 15:23
  • Incidentally, you can `await` your `observable` directly (no Subscribe call) - as long as you're expecting a single result it will do the necessary subscription and conversion implicitly. No cancellation token support there though. – James World Sep 14 '17 at 15:30
  • Thanks James, I guess ultimately it might be the wrong pattern - it feels right though... it will all ultimately be coordinated though a single non reentrant hangfire job. I guess to ensure non reentrant I would have to surround it all with a lock - but locks and async don't make a happy couple... maybe an async mutex - which there are examples of. I think Stephen Cleary has posted examples. – Jim Sep 14 '17 at 15:48
  • I will need to re-examine the test carefully - shot for the great answer. Don't think it's XY though - I clearly just want to execute the problem once in a test and see a result. Was the question incorrect? – Jim Sep 14 '17 at 16:18
  • Added some commentary on testing. – James World Sep 14 '17 at 18:09