2

I would like to create an observable sequence using reactive extensions (RX) and NCrontab. The sequence would differ from something like Observable.Timer() in that the period and due time are not fixed. After reading this article it seems that Observable.Generate() is the way to go. I am thinking of two variants: one which runs within bounds and one that runs forever. Do these implementation make sense?

public static IObservable<DateTime> Cron(string cron)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(DateTime.Now, d=>true, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}

public static IObservable<DateTime> Cron(string cron, DateTime start, DateTime end)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(start, d => d < end, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}

update: These seem to work empirically, however I added an overload which takes an IScheduler and cannot seem to get the sequence to trigger in a unit test. Am I using TestScheduler wrong or is there an issue with the function implementation?

public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(0, d => true, d => d + 1, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.DateTime)), scheduler);
}

[TestClass]
public class EngineTests
{
    [TestMethod]
    public void TestCron()
    {
        var scheduler = new TestScheduler();
        var cron = "* * * * *";
        var values = new List<int>();
        var disp = ObservableCron.Cron(cron, scheduler).Subscribe(values.Add);
        scheduler.AdvanceBy(TimeSpan.TicksPerMinute - 1);
        scheduler.AdvanceBy(1);
        scheduler.AdvanceBy(1);
        Assert.IsTrue(values.Count> 0);
    }
}

3 Answers3

3

It looks like a combination of issues. First, the Observable.Generate overload that I am using takes a Func<int,DateTimeOffset> parameter to determine the next time to trigger. I was passing in a new DateTimeOffset based on the local time of the scheduler rather than Utc, which was causing the new 'DateTimeOffset` to shift. See this question for an explanation. The correct function is below:

public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(0, d => true, d => d + 1, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.UtcDateTime)), scheduler);
}

As far as testing goes, I came up with something that demonstrates the intent a little better:

[TestMethod]
public void TestCronInterval()
{
    var scheduler = new TestScheduler();
    var end = scheduler.Now.UtcDateTime.AddMinutes(10);
    const string cron = "*/5 * * * *";
    var i = 0;
    var seconds = 0;
    var sub = ObservableCron.Cron(cron, scheduler).Subscribe(x => i++);
    while (i < 2)
    {
        seconds++;
        scheduler.AdvanceBy(TimeSpan.TicksPerSecond);
    }
    Assert.IsTrue(seconds == 600);
    Assert.AreEqual(end, scheduler.Now.UtcDateTime);
    sub.Dispose();
}
Community
  • 1
  • 1
1

I have used this solution without schedulers using Cronos:

public static IObservable<DateTimeOffset> ToObservable(this ICronScheduleObservableConfiguration configuration)
{
    Validate(configuration);
    var schedule = configuration.Expression;
    DateTimeOffset? next = null;
    return Observable.Generate(
        DateTimeOffset.Now,
            i => true,
            i => (next = schedule.GetNextOccurrence(i, configuration.TimeZone)) ?? DateTimeOffset.Now,
            i => next,
            i => i
        ).
        Where(i => i.HasValue).
        Select(i => i.Value);
}

public interface ICronScheduleObservableConfiguration :
    IObservableConfiguration
{
    /// <summary>
    /// Cron schedule with format: https://github.com/HangfireIO/Cronos#cron-format
    /// </summary>
    /// <value>Non-empty</value>
    string Schedule { get; }
    /// <summary>
    /// <see cref="Schedule"/> format
    /// </summary>
    CronFormat Format { get; }
    /// <summary>
    /// Parsed <see cref="Schedule"/> using <see cref="Format"/>
    /// </summary>
    /// <value>non-null</value>
    /// <exception cref="CronFormatException">Parsing with <see cref="CronExpression.Parse(string, CronFormat)"/> failed</exception>
    CronExpression Expression { get; }
    /// <summary>
    /// Time zone used for computing schedule times with <see cref="CronExpression.GetNextOccurrence(DateTimeOffset, TimeZoneInfo, bool)"/>
    /// </summary>
    /// <value>non-null</value>
    TimeZoneInfo TimeZone { get; }
}
0

First, scheduler.Now.DateTime is not going to give you real times in your unit test with a TestScheduler. It is going to give you virtual times based upon some pre-defined start time. You should probably use AdvanceTo to initialize the clock to something that corresponds with your crontab test data.

For this example test, that is probably not your problem. Your problem is most likely that you are writing your test at the granularity of a Tick. That rarely works. Because, for the TestScheduler, when a scheduled action occurs on Tick t that schedules another action to execute "immediately", that next action won't actually execute until Tick t+1. If that action schedules another action, then it won't execute until tick t+2, etc. So, unless you fully understand exactly how Generate schedules its work, you want to avoid writing tick-level tests.

Instead try to test at the granularity that the code you are testing supports. In this case, I think that is minutes...so write your test to advance 59 seconds, see that nothing happened, then advance 2 more seconds and see if you got what you expected. Or, better yet, use the TestScheduler.CreateObserver method and just advance once

var scheduler = new TestScheduler();

// set the virtual clock to something that corresponds with my test crontab data
scheduler.AdvanceTo(someDateTimeOffset);

// now your tests...
var cron = "* * * * *";
var observer = scheduler.CreateObserver();

var disp = ObservableCron.Cron(cron, scheduler).Subscribe(observer);

// advance the clock by 61 seconds
scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks);

// check the contents of the observer
Assert.IsTrue(observer.Messages.Count > 0);
Brandon
  • 38,310
  • 8
  • 82
  • 87