1

FWIW - I'm scrapping the previous version of this question in favor of different one along the same way after asking for advice on meta

I have a webservice that contains configuration data. I would like to call it at regular intervals Tok in order to refresh the configuration data in the application that uses it. If the service is in error (timeout, down, etc) I want to keep the data from the previous call and call the service again after a different time interval Tnotok. Finally I want the behavior to be testable.

Since managing time sequences and testability seems like a strong point of the Reactive Extensions, I started using an Observable that will be fed by a generated sequence. Here is how I create the sequence:

Observable.Generate<DataProviderResult, DataProviderResult>(
    // we start with some empty data
    new DataProviderResult() { 
            Failures = 0
            , Informations = new List<Information>()},
    // never stop
    (r) => true,
    // there is no iteration
    (r) => r,
    // we get the next value from a call to the webservice
    (r) => FetchNextResults(r),
    // we select time for next msg depending on the current failures
    (r) => r.Failures > 0 ? tnotok : tok,
    // we pass a TestScheduler
    scheduler)
.Suscribe(r => HandleResults(r));

I have two problems currently:


It looks like I am creating a hot observable. Even trying to use Publish/Connect I have the suscribed action missing the first event. How can I create it as a cold observable?

myObservable = myObservable.Publish();
myObservable.Suscribe(r => HandleResults(r));
myObservable.Connect() // doesn't call onNext for first element in sequence

When I suscribe, the order in which the suscription and the generation seems off, since for any frame the suscription method is fired before the FetchNextResults method. Is it normal? I would expect the sequence to call the method for frame f, not f+1. Here is the code that I'm using for fetching and suscription:

private DataProviderResult FetchNextResults(DataProviderResult previousResult)
{
    Console.WriteLine(string.Format("Fetching at {0:hh:mm:ss:fff}", scheduler.Now));
    try
    {
        return new DataProviderResult() { Informations = dataProvider.GetInformation().ToList(), Failures = 0};
    }
    catch (Exception)
    {}
    previousResult.Failures++;

    return previousResult;
}

private void HandleResults(DataProviderResult result)
{
    Console.WriteLine(string.Format("Managing at {0:hh:mm:ss:fff}", scheduler.Now));
    dataResult = result;
}

Here is what I'm seeing that prompted me articulating these questions:

Starting at 12:00:00:000
Fetching at 12:00:00:000 < no managing the result that has been fetched here
Managing at 12:00:01:000 < managing before fetching for frame f
Fetching at 12:00:01:000
Managing at 12:00:02:000
Fetching at 12:00:02:000

EDIT: Here is a bare bones copy-pastable program that illustrates the problem.

/*using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;*/

private static int fetchData(int i, IScheduler scheduler)
{
  writeTime("fetching " + (i+1).ToString(), scheduler);
  return i+1;
}
private static void manageData(int i, IScheduler scheduler)
{
  writeTime("managing " + i.ToString(), scheduler);
}
private static void writeTime(string msg, IScheduler scheduler)
{
  Console.WriteLine(string.Format("{0:mm:ss:fff} {1}", scheduler.Now, msg));
}

private static void Main(string[] args)
{
    var scheduler = new TestScheduler();
    writeTime("start", scheduler);
    var datas = Observable.Generate<int, int>(fetchData(0, scheduler),
                                                (d) => true,
                                                (d) => fetchData(d, scheduler),
                                                 (d) => d,
                                                 (d) => TimeSpan.FromMilliseconds(1000),
                                                 scheduler)
                                                 .Subscribe(i => manageData(i, scheduler));

    scheduler.AdvanceBy(TimeSpan.FromMilliseconds(3000).Ticks);
}

This outputs the following:

00:00:000 start
00:00:000 fetching 1
00:01:000 managing 1
00:01:000 fetching 2
00:02:000 managing 2
00:02:000 fetching 3

I don't understand why the managing of the first element is not picked up immediately after its fetching. There is one second between the sequence effectively pulling the data and the data being handed to the observer. Am I missing something here or is it expected behavior? If so is there a way to have the observer react immediately to the new value?

Community
  • 1
  • 1
samy
  • 14,832
  • 2
  • 54
  • 82

2 Answers2

2

You are misunderstanding the purpose of the timeSelector parameter. It is called each time a value is generated and it returns a time which indicates how long to delay before delivering that value to observers and then generating the next value.

Here's a non-Generate way to tackle your problem.

private DataProviderResult FetchNextResult()
{
    // let exceptions throw
    return dataProvider.GetInformation().ToList();
}

private IObservable<DataProviderResult> CreateObservable(IScheduler scheduler)
{
    // an observable that produces a single result then completes
    var fetch = Observable.Defer(
        () => Observable.Return(FetchNextResult));

    // concatenate this observable with one that will pause
    // for "tok" time before completing.
    // This observable will send the result
    // then pause before completing.
    var fetchThenPause = fetch.Concat(Observable
        .Empty<DataProviderResult>()
        .Delay(tok, scheduler));

    // Now, if fetchThenPause fails, we want to consume/ignore the exception
    // and then pause for tnotok time before completing with no results
    var fetchPauseOnErrors = fetchThenPause.Catch(Observable
        .Empty<DataProviderResult>()
        .Delay(tnotok, scheduler));

    // Now, whenever our observable completes (after its pause), start it again.
    var fetchLoop = fetchPauseOnErrors.Repeat();

    // Now use Publish(initialValue) so that we remember the most recent value
    var fetchLoopWithMemory = fetchLoop.Publish(null);

    // YMMV from here on.  Lets use RefCount() to start the
    // connection the first time someone subscribes
    var fetchLoopAuto = fetchLoopWithMemory.RefCount();

    // And lets filter out that first null that will arrive before
    // we ever get the first result from the data provider
    return fetchLoopAuto.Where(t => t != null);
}

public MyClass()
{
    Information = CreateObservable();
}

public IObservable<DataProviderResult> Information { get; private set; }
Brandon
  • 38,310
  • 8
  • 82
  • 87
  • Thank you! the time offset really tripped me up; the documentation only mentions it being "The time selector function to control the speed of values being produced each iteration.", but it is more the delay between generation and publication to the observers. My misunderstanding was reinforced by the parallel drawn between Generate and Interval/Timer (http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html#ObservableTimer) – samy Jul 03 '14 at 08:24
  • As a side note, both Timer and Interval present the same behavior, ie the value that is generated at t is only passed to the suscribers at t+1. This really bugs me – samy Jul 03 '14 at 08:25
  • @samy - not true. `Observable.Timer(someTime).Subscribe(...)` calls my callback after `someTime`. And `Interval` works similarly It is really just a quirk of `Generate` that it behaves the way it does. – Brandon Jul 03 '14 at 15:29
  • That's what i meant by "same behavior"; the observer is notified after `someTime`, while I keep expecting the first event to occur right away. I indeed made a mistake though since there is no customizable event generation in the `Timer` and `Interval` observables; I cannot say when the value is generated, but i can see that it is processed one step later than *what I expect*. I'll wrap my head around it eventually :) – samy Jul 03 '14 at 15:36
1

Generate produces cold observable sequences, so that is my first alarm bell.

I tried to pull your code into linqpad* and run it and changed it a bit to focus on the problem. It seems to me that you have the Iterator and ResultSelector functions confused. These are back-to-front. When you iterate, you should take the value from your last iteration and use it to produce your next value. The result selector is used to pick off (Select) the value form the instance you are iterating on.

So in your case, the type you are iterating on is the type you want to produce values of. Therefore keep your ResultSelector function just the identity function x=>x, and your IteratorFunction should be the one that make the WebService call.

Observable.Generate<DataProviderResult, DataProviderResult>(
    // we start with some empty data
    new DataProviderResult() { 
            Failures = 0
            , Informations = new List<Information>()},
    // never stop
    (r) => true,
    // we get the next value(iterate) by making a call to the webservice
    (r) => FetchNextResults(r),
    // there is no projection
    (r) => r,
    // we select time for next msg depending on the current failures
    (r) => r.Failures > 0 ? tnotok : tok,
    // we pass a TestScheduler
    scheduler)
.Suscribe(r => HandleResults(r));

As a side note, try to prefer immutable types instead of mutating values as you iterate.

*Please provide an autonomous working snippet of code so people can better answer your question. :-)

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • 1
    Duh! I really have problems wrapping my head around which is which, and the order they get called. Thanks for the heads up, the sequence is indeed cold. However I edited my question with copy-pastable code; there is a time discrespancy between the sequence being updated and the observer picking it up, which I don't understand – samy Jul 02 '14 at 08:02