1

I need to transform an stream of objects to a stream of batches of objects, grouping them by a property value using Reactive Extensions:

class Record
{
    public string Group;
    public int Value;
}

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    // ...
}

A batch is finished and sent to the output stream when either of these happens:

  • A new object comes from source stream and its Group value differs from the previous value
  • There have been no new objects in the source stream for N seconds

For example, if a1 means new Record { Group = "a", Value = 1}:

input:   -a1-a2-a3-b1-b2-
output:  -[a1, a2, a3]-[b1, b2]-

input:   -a1-a2----------a3-
output:  -[a1, a2]-------[a3]-

Tried various combinations of GroupByUntil, Debounce, Buffer, and Timer to no avail. How is it done?

Impworks
  • 2,647
  • 3
  • 25
  • 53
  • Perhaps [this answer](https://stackoverflow.com/questions/29858974/rx-group-batch-bursts-of-elements-in-an-observable-sequence/29859685#29859685) can be helpful for you – Dmitry Stepanov Feb 22 '19 at 19:19

1 Answers1

3

The trick was to use GroupByUntil with Throttle on itself:

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
                 .SelectMany(x => x.ToList());
}
Impworks
  • 2,647
  • 3
  • 25
  • 53