0

Despite the patient assistance of the community here, here, here and here - I fear I am no closer to true understanding. I am trying to hammer the concepts from IntroToRx into my brain, but to paraphrase King Arthur: "this new learning befuddles me. Explain again about the sheep's bladders and earthquakes." Which, to my mind, is what we might as well be talking about.

I'll dumb it down to the requirements (which have changed - .NET 4.0 being the kicker) rather than possibly going down the wrong path with my own (likely flawed) implementation:

  1. I need to use .NET Framework 4.0 to write a Windows Service.
  2. I need my Windows service to call a web service (WS), and pass it one of a number of URLs, but just one URL per call.
  3. I need to be able to define individual intervals for each URL (1 to n) to call the WS. For example: I might call the WS every hour for URL1, every four hours for URL2, etc.
  4. I need to be able to stop calling the web service for a particular URL if the web service returns an error for that URL.

So that's it. From what little I understand, this is a perfect Rx scenario. I'm pretty sure I want to create Observable.Timers for the sequences, but how do I disable a timer on an error? This is probably a CancellationToken waiting to happen (or would I just Complete the timer), but I just can't seem to take the examples and mush them into a solid, understandable solution.

Whaddaya say, S.O.: anyone want to take this old man by the hand and explain in small words the right way to do such nonsense? If I haven't explained it well, or if I've left something out, I'll be happy to edit.

Thanks.

Community
  • 1
  • 1
Scott Baker
  • 10,013
  • 17
  • 56
  • 102

1 Answers1

2

1) I think you want to look at Observable.Interval. Observable.Timer is a single value sequence, in contrast Observable.Interval will keep producing values at the given interval. If for each item/url this polling period is constant, then this is probably what you want.

2) You can mix IEnumerable (URLs) and IObservable (polling event) sequences together.

3) An error will terminate a sequence. You can leverage this by projecting (Select) a value from a sequence to the output of a WebRequest. If the WebRequest fails, the sequence will OnError with the exception being propagated as the OnError payload.

Moving through this slowly then;

  • we want to take a list of URL/Period pairs,
  • poll the URL at its given period,
  • stop polling a URL if any call to that URL fails (and probably log it)
  • stop polling if the windows service is stopped

Lets first just deal with a single URL to poll

protected override void OnStart(string [] args)
{
    var resource = new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)};

    var pollingSequence =  Observable.Interval(resource.Period)
                                     .Select(_=>Request(resource.Url));
    var subscription = pollingSequence.Subscribe(
            response=>Log(response),
            ex=>Log(ex)
        ));
    _subscriptions = new CompositeDisposable(subscription);
}

protected override void OnStop()
{
    Dispose();
}

private bool Request(string url)
{
    //TODO:
}

public void Dispose()
{
    _subscriptions.Dispose();
}

To visualize this, we can use a "Marble diagram". Here each char space represents 1 minute. OnNext are represented as an 'o' (marble).

    bing    --o--o--o--o-       (every 3rd minute)

To be more accurate we are actually getting values through on the event (even though we ignore the value)

    bing    --0--1--2--3-       

We then take each event and project it to a request so the sequence now goes to this (where 'r' represents a Response from a request)

    bing    --r--r--r--r-       

If any request was to fail, the sequence would teminate e.g here the 3 request fails and we show an OnError with an 'X'

    bing    --r--r--X

Now we can extend the example to a list of resource i.e. IEnumerable<IObservable<T>>

protected override void OnStart(string [] args)
{
    var resources = new[] { 
        new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)},
        new {Url="http://www.google.com", Period=TimeSpan.FromMinutes(2)},
        new {Url="http://www.yahoo.com", Period=TimeSpan.FromMinutes(5)},
        new {Url="http://www.stackoverflow.com", Period=TimeSpan.FromMinutes(2)}
    };

    //The same as before, but now we create a polling sequence per resource.
    var pollingSequences = from resource in resources
                        select Observable.Interval(resource.Period)
                                            .Select(_=>Request(resource.Url));

    //Now we cant subscribe to an `IEnumerable<IObservable<T>>` *, 
    //  but we can subscribe to each sequence in it.
    // This turns our queries (IE<IO<T>>) into subscriptions (IE<IDisposable>).
    var subscriptions = pollingSequences.Select(x => 
        x.Subscribe(
            response=>Log(response),
            ex=>Log(ex)
        ));
    _subscriptions = new CompositeDisposable(subscriptions);
}

Here we have a service that will start calling the Request method (for you to implement), passing the Url at a period specified for the Url. If the Request method throws, then the Url will no longer get polled. If you stop the service, then the subscriptions are disposed and no more polling happens.

To visualize this, we can use a "Marble diagram". So we start out with a "row" for each "resource" (data in space) ie. var resources = new[] { ...};

    bing
    google
    yahoo
    SO

Next we consider each row to also have a sequence of polling event (data in time). From the code above (using 1 char = 1 minute)

    bing    --1--2--3-      (every 3rd minute)
    google  -1-2-3-4-5      (every 2nd minute)
    yahoo   ----1----2      (every 5 minutes)
    SO      -1-2-3-4-5      (every 2nd minute)
  • You can't subscribe to IEnumerable<IObservable<T>>, but there are Rx methods like Concat and Merge that deal with this type of data. We dont want to use these however as they flatten the sequences into a single sequence, which would mean if any failed, all polling would stop.
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Lee, you're too kind. Thank you for taking extra time to break this down for the slower kid in the class. ;-) Can you break down the LINQ query for pollingSequences? I'm having trouble conceptualizing the way the `select` and `.Select()` are working together. – Scott Baker Sep 13 '13 at 05:50
  • I have updated the sample to show working with a single resource first, and then extend that sample working with a list of resources. HTH – Lee Campbell Sep 13 '13 at 08:51
  • you're an amazing teacher. This is fantastic, and I hope it helps the rest of the community as much as it has me. Thanks again for this. – Scott Baker Sep 13 '13 at 15:45
  • this code is still "falling through" and the program stops almost immediately. shouldn't the subscription to `new CompositeDisposable(subscriptions)` prevent that from happening? – Scott Baker Jan 08 '14 at 16:41