1

I've got a very simple class that I am using to poll a directory for new files. It's got the location, a time to start monitoring that location, and an interval (in hours) for when to check again:

public class Thing
{
  public string         Name {get; set;}
  public Uri            Uri { get; set;}
  public DateTimeOffset StartTime {get; set;}
  public double         Interval {get; set;}
}

I am new to Reactive Extensions, but I think it is exactly the right tool for the job here. At the start time, and on every subsequent interval, I simply want to call a web service that does all the heavy lifting - we'll use the ever inventive public bool DoWork(Uri uri) to represent that.

edit: DoWork is a call to a web service that will check for new files and move them if necessary, so its execution should be async. It returns a true if it completed, false if not.

Things get complicated if I have a whole collection of these Things. I can't wrap my head around how to create the Observable.Timer() for each one, and have them all call the same method.

edit2: Observable.Timer(DateTimeOffset, Timespan) seems perfect to create an IObservable for what I'm trying to do here. Thoughts?

Dave Clemmer
  • 3,741
  • 12
  • 49
  • 72
Scott Baker
  • 10,013
  • 17
  • 56
  • 102

3 Answers3

3

Do you need there to be many timers? I assume if you have a collection of 20 things, then we will create 20 timers all to fire at the same point in time? On the same thread/scheduler?

Or perhaps you want to DoWork foreach thing at every period?

i.e.

from thing in things
from x in Observable.Interval(thing.Interval)
select DoWork(thing.Uri)

vs.

Observable.Interval(interval)
.Select(_=>
    {
        foreach(var thing in Things)
        {
            DoWork(thing);
        }
    })

There are many ways that you can do work in the future.

  • You can use a Scheduler directly to schedule work to be done in the future.
  • You can use Observable.Timer to have a sequence that produces one value in the specified time in the future.
  • You can use Observable.Interval to have a sequence that produces many values each the specified time period apart.

So this now introduces another question. If you have your polling time as 60seconds and you do work function takes 5 seconds; should the next poll happen in 55 seconds or in 60 seconds? Here one answer indicates you want to use an Rx sequence, the other indicates that you probably want to use Periodic Scheudling.

Next question is, does DoWork return a value? Currently it looks like it does not*. In this case I think the most appropriate thing for you to do is to leverage the Periodic Schedulers (assuming Rx v2).

var things = new []{
    new Thing{Name="google", Uri = new Uri("http://google.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3},
    new Thing{Name="bing", Uri = new Uri("http://bing.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3}
};
var scheduler = Scheduler.Default;
var scheduledWork = new CompositeDisposable();

foreach (var thing in things)
{
    scheduledWork.Add( scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri)));
}

//Just showing that I can cancel this i.e. clean up my resources.
scheduler.Schedule(TimeSpan.FromSeconds(10), ()=>scheduledWork.Dispose());

This now will schedule each thing to be processed periodically (without drift), on it's own interval and provide cancellation.

We can now upgrade this to a query if you like

var scheduledWork = from thing in things
                    select scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri));

var work = new CompositeDisposable(scheduledWork);

The problem with these query is that we don't fulfil the StartTime requirement. Annoyingly the Ccheduler.SchedulePeriodic method does not provide an overload to also have a start offset.

The Observable.Timer operator does however provide this. It will also internally leverage the non-drifting scheduling features. To reconstruct the query with Observable.Timer we can do the following.

var urisToPoll = from thing in things.ToObservable()
                 from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
                 select thing;

var subscription = urisToPoll.Subscribe(t=>DoWork(t.Uri));

So now you have a nice interface that should avoid drift. However, I think the work here is done in a serial manner (if many DoWork actions are called concurrently).

*ideally I would try to avoid side effect statements like this but I am not 100% sure what your requirements.

EDIT It appears that the calls to DoWork must be in parallel, so then you need to do a bit more. Ideally you make DoWork asnyc, but if you cant we can fake it till we make it.

var polling = from thing in things.ToObservable()
              from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
              from result in Observable.Start(()=>DoWork(thing.Uri))
              select result;

var subscription = polling.Subscribe(); //Ignore the bool results?
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • 2
    This is a fantastic explanation - tho I'm still digesting it - DoWork is a call to a web service. It checks each folder (uri) for new files and does things to files it finds... horrible, terrible things. – Scott Baker Aug 08 '13 at 15:25
  • "Drift" - compensating for the next interval based on completion of the previous DoWork - is not necessary. The interval is expected to be every half hour at its most frequent. – Scott Baker Aug 08 '13 at 15:27
  • You can potentially ignore all of it and just use the last query/code block. The other answers are all pretty good too. – Lee Campbell Aug 08 '13 at 15:33
  • I don't understand how the "from _ in Observable.Timer" line does anything. You never use the result for anything... – Scott Baker Aug 13 '13 at 22:41
  • In functional programming (at least in C#) the underscore character is polite practice to indicate that the return value or input parameter is not used. In this case it is not used. Syntactically, a variable name is required there. We need the line because we want a timer for each 'thing'. The Timer sequence will yield a value (ignored) which will then allow the query chain to execute the Observable.Start(DoWork) clause – Lee Campbell Aug 17 '13 at 18:37
2

Hmmm. Does DoWork produce a result that you need to do something with? I'll assume so. You didn't say, but I'll also assume DoWork is synchronous.

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .Select(_ => new { thing, result = DoWork(thing.Uri) }))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

Here's a version with a hypothetical Task<bool> DoWorkAsync(Uri):

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .SelectMany(Observable.FromAsync(async () =>
       new { thing, result = await DoWorkAsync(thing.Uri) })))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

This version assumes that DoWorkAsync will finish long before the interval expires and starts up a new instance and so does not guard against having concurrent DoWorkAsync running for the same Thing instance.

Brandon
  • 38,310
  • 8
  • 82
  • 87
  • DoWork calls a web service to check the folder for files. It returns a boolean if it completes successfully, false if not. – Scott Baker Aug 08 '13 at 15:19
  • "I'll assume DoWork is synchronous" - DoWork should execute once for each Thing every Interval, but independently of other Things. Things should DoWork in parallel... DoWork calls a web service to check each folder for files. – Scott Baker Aug 08 '13 at 15:32
  • 1
    Then you may want to introduce some concurrency in your call to DoWork. Even better, make DoWork async by returning Task or IObservable, then you can chain these together realy nicely. – Lee Campbell Aug 08 '13 at 15:36
  • 2
    Yes this version runs the things in parallel, so there could be concurrent `DoWork` calls in flight if multiple `Things` intervals coincide. I agree with @LeeCampbell's suggestion that if `DoWork` is calling some web service, you could improve resource utilization if it were an async method. – Brandon Aug 08 '13 at 15:48
  • @Brandon I get a red squiggly under `result = await DoWorkAsync(thing.Uri)` with the message "Cannot assign void to anonymous type property". While I'm sure this is telling me something useful, I'll be darned if I know what it is. – Scott Baker Aug 13 '13 at 23:23
  • If `DoWork` is declared to return a `bool`, then naturally `DoWorkAsync` should return a `Task`. You probably declared it as just returning a `Task`. If you want to keep it that way, then change your `SelectMany` to be: `SelectMany(Observable.FromAsync(async () => { await DoWorkAsync(thing.Uri); return thing; }))` – Brandon Aug 14 '13 at 14:35
  • @ScottSEA check out The SelectMany chapter from (my site) http://introtorx.com/Content/v1.0.10621.0/08_Transformation.html#SelectMany. Chaining is shown in my example above "From thing in things, from ignored in Timer, from result in DoWork" – Lee Campbell Aug 17 '13 at 18:43
  • @LeeCampbell Starting over at Chapter One, my friend. – Scott Baker Aug 24 '13 at 06:13
1

This is the clearest method I can think of:

foreach (var thing in things)
{
    Scheduler.ThreadPool.Schedule(
        thing.StartTime,
        () => {
                DoWork(thing.Uri); 
                Observable.Interval(TimeSpan.FromHours(thing.Interval))
                          .Subscribe(_ => DoWork(thing.Uri)));
              }
}

The following one is more functional:

things.ToObservable()
    .SelectMany(t => Observable.Timer(t.StartTime).Select(_ => t))
    .SelectMany(t => Observable.Return(t).Concat(
        Observable.Interval(TimeSpan.FromHours(t.Interval)).Select(_ => t)))
    .Subscribe(t => DoWork(t.Uri));

The First SelectMany creates a stream of things at their scheduled times. The second SelectMany takes this stream and creates a new one that repeats every interval. This needs to be joined to the Observable.Return which produces a value immediately as Observable.Interval's first value is delayed.

*Note, the first solution requires c#5 or else this will bite you.

Community
  • 1
  • 1
Matthew Finlay
  • 3,354
  • 2
  • 27
  • 32