1) I think you want to look at Observable.Interval
. Observable.Timer
is a single value sequence, in contrast Observable.Interval
will keep producing values at the given interval. If for each item/url this polling period is constant, then this is probably what you want.
2) You can mix IEnumerable
(URLs) and IObservable
(polling event) sequences together.
3) An error will terminate a sequence. You can leverage this by projecting (Select
) a value from a sequence to the output of a WebRequest. If the WebRequest fails, the sequence will OnError
with the exception being propagated as the OnError payload.
Moving through this slowly then;
- we want to take a list of URL/Period pairs,
- poll the URL at its given period,
- stop polling a URL if any call to that URL fails (and probably log it)
- stop polling if the windows service is stopped
Lets first just deal with a single URL to poll
protected override void OnStart(string [] args)
{
var resource = new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)};
var pollingSequence = Observable.Interval(resource.Period)
.Select(_=>Request(resource.Url));
var subscription = pollingSequence.Subscribe(
response=>Log(response),
ex=>Log(ex)
));
_subscriptions = new CompositeDisposable(subscription);
}
protected override void OnStop()
{
Dispose();
}
private bool Request(string url)
{
//TODO:
}
public void Dispose()
{
_subscriptions.Dispose();
}
To visualize this, we can use a "Marble diagram". Here each char space represents 1 minute. OnNext are represented as an 'o' (marble).
bing --o--o--o--o- (every 3rd minute)
To be more accurate we are actually getting values through on the event (even though we ignore the value)
bing --0--1--2--3-
We then take each event and project it to a request so the sequence now goes to this (where 'r' represents a Response from a request)
bing --r--r--r--r-
If any request was to fail, the sequence would teminate e.g here the 3 request fails and we show an OnError with an 'X'
bing --r--r--X
Now we can extend the example to a list of resource i.e. IEnumerable<IObservable<T>>
protected override void OnStart(string [] args)
{
var resources = new[] {
new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)},
new {Url="http://www.google.com", Period=TimeSpan.FromMinutes(2)},
new {Url="http://www.yahoo.com", Period=TimeSpan.FromMinutes(5)},
new {Url="http://www.stackoverflow.com", Period=TimeSpan.FromMinutes(2)}
};
//The same as before, but now we create a polling sequence per resource.
var pollingSequences = from resource in resources
select Observable.Interval(resource.Period)
.Select(_=>Request(resource.Url));
//Now we cant subscribe to an `IEnumerable<IObservable<T>>` *,
// but we can subscribe to each sequence in it.
// This turns our queries (IE<IO<T>>) into subscriptions (IE<IDisposable>).
var subscriptions = pollingSequences.Select(x =>
x.Subscribe(
response=>Log(response),
ex=>Log(ex)
));
_subscriptions = new CompositeDisposable(subscriptions);
}
Here we have a service that will start calling the Request
method (for you to implement), passing the Url at a period specified for the Url. If the Request method throws, then the Url will no longer get polled. If you stop the service, then the subscriptions are disposed and no more polling happens.
To visualize this, we can use a "Marble diagram". So we start out with a "row" for each "resource" (data in space) ie. var resources = new[] { ...};
bing
google
yahoo
SO
Next we consider each row to also have a sequence of polling event (data in time). From the code above (using 1 char = 1 minute)
bing --1--2--3- (every 3rd minute)
google -1-2-3-4-5 (every 2nd minute)
yahoo ----1----2 (every 5 minutes)
SO -1-2-3-4-5 (every 2nd minute)
- You can't subscribe to
IEnumerable<IObservable<T>>
, but there are Rx methods like Concat and Merge that deal with this type of data.
We dont want to use these however as they flatten the sequences into a single sequence, which would mean if any failed, all polling would stop.