3

I have the following Rx extension method for partitioning an IEnumerable<T> and delaying the producing of each partitioned value. It uses an IEnumerable<T> extension to partition the data, which is also shown with a unit test.

Is there a better way to do the 'delay' than using the Observable.Timer().Wait() method call?

public static class RxExtensions
{
    public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(
        this IObservable<IEnumerable<T>> source, int size, TimeSpan interval,
        IScheduler scheduler = null)
    {
        if (scheduler == null)
        {
            scheduler = TaskPoolScheduler.Default;
        }

        var intervalEnabled = false;
        return source.SelectMany(x => x.Partition(size).ToObservable())
            .Window(1)
            .SelectMany(x =>
            {
                if (!intervalEnabled)
                {
                    intervalEnabled = true;
                }
                else
                {
                    Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
                }

                return x;
            })
            .ObserveOn(scheduler);
    } 
}

public static class EnumerableExtensions
{
    public static IEnumerable<IEnumerable<T>> Partition<T>(
        this IEnumerable<T> source, int size)
    {
        using (var enumerator = source.GetEnumerator())
        {
            var items = new List<T>();
            while (enumerator.MoveNext())
            {
                items.Add(enumerator.Current);
                if (items.Count == size)
                {
                    yield return items.ToArray();

                    items.Clear();
                }
            }
           
            if (items.Any())
            {
                yield return items.ToArray();
            }
        }
    }
}

Test for the Rx extension method is shown below:

static void Main(string[] args)
{
     try
     {
         var data = Enumerable.Range(0, 10);
         var interval = TimeSpan.FromSeconds(1);

         Observable.Return(data)
            .PartitionWithInterval(2, interval)
            .Timestamp()
            .Subscribe(x =>
                {
                   var message = $"{x.Timestamp} - count = {x.Value.Count()}" +
                       $", values - {x.Value.First()}, {x.Value.Last()}";
                   Console.WriteLine(message);
                });

           Console.ReadLine();
       }
       catch (Exception e)
       {
           Console.WriteLine(e);
       }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
AwkwardCoder
  • 24,893
  • 27
  • 82
  • 152
  • *"I have the following Rx extension method for partitioning an IEnumerable"* <== Does this literaly mean that you have a single `IEnumerable` to partition? I.e. is the `IObservable>` expected to emit a single `IEnumerable`, and then complete? – Theodor Zoulias Jan 08 '21 at 12:56
  • Related: [Reactive Extensions: Process events in batches + add delay between every batch](https://stackoverflow.com/questions/10927298/reactive-extensions-process-events-in-batches-add-delay-between-every-batch) – Theodor Zoulias Jan 09 '21 at 06:04

3 Answers3

1

This should do it:

public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null)
{
    if (scheduler == null)
    {
        scheduler = TaskPoolScheduler.Default;
    }

    return source
        //don't need the .ToObservable() call, since Zip can work on IEnumerable + IObservable.
        .SelectMany(x => x.Partition(size)) 
        .Zip(Observable.Interval(interval, scheduler).StartWith(0), (x, _) => x)
        .ObserveOn(scheduler);
}

Funny how PartitionWithInterval actually calls Partition and Interval.

The StartWith is just there so you get a partition dropped immediately: Similar to how you had the intervalEnabled flag.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • The Rx `Zip` operator is scary. It buffers the values coming from the faster of the two observables, resulting potentially to massive memory allocation during a long-life subscription. – Theodor Zoulias Jan 05 '21 at 23:09
  • In this case the faster observable is actually an enumerable, so the values are already in memory. – Shlomo Jan 06 '21 at 00:20
  • Hmm. Is the `IObservable> source` supposed to emit a single `IEnumerable`, that should also be emitted synchronously during the subscription? If so, you are right about the `Zip`, but you should probably enforce this requirement somehow. The single emission requirement could be enforced by the `SingleAsync` operator, but the synchronous emission requirement how? – Theodor Zoulias Jan 06 '21 at 03:12
  • It’s in the question. You start with an enumerable [012345789], and get an observable that time-spaces out the enumerable in clumps: [01]...[23]...[45]... The faster source here is the enumerable (which again, is already in memory). – Shlomo Jan 06 '21 at 07:49
  • This seems to be just a test for validating the correctness of the `PartitionWithInterval` method. I don't think that this test indicates how the OP intends to use this method in practice. – Theodor Zoulias Jan 06 '21 at 13:11
  • First sentence: "I have the following Rx extension method for partitioning an IEnumerable and delaying the producing of each partitioned value" – Shlomo Jan 08 '21 at 12:02
  • Hmm, fair enough. I asked the OP to clarify the question. – Theodor Zoulias Jan 08 '21 at 12:58
  • Even in case that the OP's intention is to partition a singe `IEnumerable`, enumerating it eagerly and storing all its values into memory seems like a bad idea. An `IEnumerable` can potentially emit an infinite number of values. A more efficient approach would be to enumerate it lazily on every `interval`, just enough to produce a partition and emit it. – Theodor Zoulias Jan 09 '21 at 03:40
0

Here is an implementation of the PartitionWithInterval operator, that is optimized towards memory efficiency. The enumerables emitted by the IObservable<IEnumerable<T>> are enumerated lazily, just enough to produce the next one or two partitions. Then their enumeration is suspended until the next interval. To achieve this laziness, the implementation uses IAsyncEnumerables instead of IObservables, and makes use of operators from the packages System.Linq.Async and System.Interactive.Async.

public static IObservable<IList<T>> PartitionWithInterval<T>(
    this IObservable<IEnumerable<T>> source, int size,
    TimeSpan interval, IScheduler scheduler = null)
{
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        Task delayTask = Task.CompletedTask;
        return source
            .ToAsyncEnumerable()
            .SelectMany(x => x.ToAsyncEnumerable()).Buffer(size) /* Behavior A */
            //.SelectMany(x => x.ToAsyncEnumerable().Buffer(size)) /* Behavior B */
            .Do(async (_, cancellationToken) =>
            {
                await delayTask;
                var timer = Observable.Timer(interval, scheduler);
                delayTask = timer.ToTask(cancellationToken);
            })
            .ToObservable();
    });
}

Below is a marble diagram that shows the behavior of the PartitionWithInterval operator, configured with size: 2:

Source: +----[1,2,3,4,5]--------------------[6,7,8,9]---|
Output: +----[1,2]-------[3,4]--------------[5,6]-------[7,8]-------[9]|

As shown, an output partition may contain values from more than one enumerables (the partition [5,6] in the above diagram). In case this is undesirable, just comment the line "Behavior A" and uncomment the line "Behavior B". The marble diagram below shows the effect of this change:

Source: +----[1,2,3,4,5]--------------------[6,7,8,9]---|
Output: +----[1,2]-------[3,4]-------[5]-------[6,7]-------[8,9]|

Note: The above solution is not absolutely satisfactory regarding the intention to enumerate lazily the enumerables emitted by the source observable. The ideal would be to produce each partition exactly at the time it should be emitted. Instead, the above implementation gathers the elements of the next partition immediately after emitting the previous partition. The alternative would be to enforce a delay after emitting each partition, including the last one. This would postpone the completion of the resulting IObservable by a time span equal to interval, which is not ideal either (this behavior is implemented by the revision 3 of this answer). The ideal behavior could probably be achieved by re-implementing the operators ToAsyncEnumerable, SelectMany, Buffer and Do, so that they communicate the state IsLast of the currently emitted element. Even if this is possible, it would require a lot of effort for such a measly improvement.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
-1

Feels like you have to use operator Buffer. Try this:

            data.ToObservable()
                .Buffer(2)
                .Zip(Observable.Interval(interval), (x, _) => x)
                .Timestamp()
                .Subscribe(x =>
                    {
                        var message = $"buffer {x.Timestamp} - count = {x.Value.Count()}, values - {x.Value.First()}, {x.Value.Last()}";
                        Console.WriteLine(message);
                    });
Dmitrii
  • 321
  • 6
  • 17