I would like to create an observable sequence using reactive extensions (RX) and NCrontab. The sequence would differ from something like Observable.Timer()
in that the period and due time are not fixed. After reading this article it seems that Observable.Generate()
is the way to go. I am thinking of two variants: one which runs within bounds and one that runs forever. Do these implementation make sense?
public static IObservable<DateTime> Cron(string cron)
{
var schedule = CrontabSchedule.Parse(cron);
return Observable.Generate(DateTime.Now, d=>true, d => DateTime.Now, d => d,
d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}
public static IObservable<DateTime> Cron(string cron, DateTime start, DateTime end)
{
var schedule = CrontabSchedule.Parse(cron);
return Observable.Generate(start, d => d < end, d => DateTime.Now, d => d,
d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}
update: These seem to work empirically, however I added an overload which takes an IScheduler
and cannot seem to get the sequence to trigger in a unit test. Am I using TestScheduler
wrong or is there an issue with the function implementation?
public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
var schedule = CrontabSchedule.Parse(cron);
return Observable.Generate(0, d => true, d => d + 1, d => d,
d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.DateTime)), scheduler);
}
[TestClass]
public class EngineTests
{
[TestMethod]
public void TestCron()
{
var scheduler = new TestScheduler();
var cron = "* * * * *";
var values = new List<int>();
var disp = ObservableCron.Cron(cron, scheduler).Subscribe(values.Add);
scheduler.AdvanceBy(TimeSpan.TicksPerMinute - 1);
scheduler.AdvanceBy(1);
scheduler.AdvanceBy(1);
Assert.IsTrue(values.Count> 0);
}
}