It seems that the Rx library lacks a mechanism for converting an IEnumerable<T>
to an IObservable<T>
, by enumerating it lazily and time shifting its elements. Below is a custom implementation. The idea is to Zip
the source enumerable with a subject, and control the enumeration by sending OnNext
notifications to the subject at the right moments.
/// <summary>Converts an enumerable sequence to a time shifted observable sequence,
/// based on a time selector function for each element.</summary>
public static IObservable<T> ToObservable<T>(
this IEnumerable<T> source,
Func<T, DateTimeOffset> dueTimeSelector,
IScheduler scheduler = null)
{
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
var subject = new BehaviorSubject<Unit>(default);
return subject
.Zip(source, (_, x) => x)
.Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
.Do(_ => subject.OnNext(default));
});
}
The BehaviorSubject
has been chosen because it has an initial value, and so it sets the wheels in motion naturally.
The Observable.Defer
operator is used to prevent multiple subscriptions from sharing the same state (the subject
in this case), and interfering with each other. More info about this here.
Usage example:
IEnumerable<Event> events = GetEvents();
IObservable<Event> observable = events.ToObservable(x => x.Timestamp);