4

I have a collection of items of following class:

public class Event
{
    public DateTimeOffset Timestamp;
    public object Data;
}

I want to create IObservable<Event> where each item is published at the time of Timestamp in the future. Is this possible with Observable.Delay or do I have to write my own IObservable<T> implementation?

I will mention that this structure is something like a log file. There can be tens of thousands of Event items, but only 1-2 are to be published per second.

ghord
  • 13,260
  • 6
  • 44
  • 69

3 Answers3

6

It turns out it's very simple to do with Observable.Delay overload taking variable time:

//given IEnumerable<Event> events:
var observable = events.ToObservable().Delay(ev => Observable.Timer(ev.Timestamp));
ghord
  • 13,260
  • 6
  • 44
  • 69
1

While my first answer is working as intended, performance of creating the observable sequence is not ideal with hundreds of thousands of events - you pay substantial initialization cost (order of 10 seconds on my machine).

To improve performance, taking advantage of already sorted nature of my data, I implemented custom IEnumerable<Event> that is looping through events, yielding and sleeping between them. With this IEnumerable one can easily call ToObservable<T> and it works as intended:

IObservable<Event> CreateSimulation(IEnumerable<Event> events)
{
     IEnumerable<Event> simulation()
     {
         foreach(var ev in events)
         {
             var now = DateTime.UtcNow;

             if(ev.Timestamp > now)
             {
                 Thread.Sleep(ev.Timestamp - now);
             }

             yield return ev;          
        }
    }

    return simulation().ToObservable();
}
ghord
  • 13,260
  • 6
  • 44
  • 69
0

It seems that the Rx library lacks a mechanism for converting an IEnumerable<T> to an IObservable<T>, by enumerating it lazily and time shifting its elements. Below is a custom implementation. The idea is to Zip the source enumerable with a subject, and control the enumeration by sending OnNext notifications to the subject at the right moments.

/// <summary>Converts an enumerable sequence to a time shifted observable sequence,
/// based on a time selector function for each element.</summary>
public static IObservable<T> ToObservable<T>(
    this IEnumerable<T> source,
    Func<T, DateTimeOffset> dueTimeSelector,
    IScheduler scheduler = null)
{
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        var subject = new BehaviorSubject<Unit>(default);
        return subject
            .Zip(source, (_, x) => x)
            .Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
            .Do(_ => subject.OnNext(default));
    });
}

The BehaviorSubject has been chosen because it has an initial value, and so it sets the wheels in motion naturally.

The Observable.Defer operator is used to prevent multiple subscriptions from sharing the same state (the subject in this case), and interfering with each other. More info about this here.

Usage example:

IEnumerable<Event> events = GetEvents();

IObservable<Event> observable = events.ToObservable(x => x.Timestamp);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104