I have the following Rx extension method for partitioning an IEnumerable<T>
and delaying the producing of each partitioned value. It uses an IEnumerable<T>
extension to partition the data, which is also shown with a unit test.
Is there a better way to do the 'delay' than using the Observable.Timer().Wait()
method call?
public static class RxExtensions
{
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(
this IObservable<IEnumerable<T>> source, int size, TimeSpan interval,
IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
var intervalEnabled = false;
return source.SelectMany(x => x.Partition(size).ToObservable())
.Window(1)
.SelectMany(x =>
{
if (!intervalEnabled)
{
intervalEnabled = true;
}
else
{
Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
}
return x;
})
.ObserveOn(scheduler);
}
}
public static class EnumerableExtensions
{
public static IEnumerable<IEnumerable<T>> Partition<T>(
this IEnumerable<T> source, int size)
{
using (var enumerator = source.GetEnumerator())
{
var items = new List<T>();
while (enumerator.MoveNext())
{
items.Add(enumerator.Current);
if (items.Count == size)
{
yield return items.ToArray();
items.Clear();
}
}
if (items.Any())
{
yield return items.ToArray();
}
}
}
}
Test for the Rx extension method is shown below:
static void Main(string[] args)
{
try
{
var data = Enumerable.Range(0, 10);
var interval = TimeSpan.FromSeconds(1);
Observable.Return(data)
.PartitionWithInterval(2, interval)
.Timestamp()
.Subscribe(x =>
{
var message = $"{x.Timestamp} - count = {x.Value.Count()}" +
$", values - {x.Value.First()}, {x.Value.Last()}";
Console.WriteLine(message);
});
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}