I need to transform an stream of objects to a stream of batches of objects, grouping them by a property value using Reactive Extensions:
class Record
{
public string Group;
public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
// ...
}
A batch is finished and sent to the output stream when either of these happens:
- A new object comes from source stream and its
Group
value differs from the previous value - There have been no new objects in the source stream for
N
seconds
For example, if a1
means new Record { Group = "a", Value = 1}
:
input: -a1-a2-a3-b1-b2-
output: -[a1, a2, a3]-[b1, b2]-
input: -a1-a2----------a3-
output: -[a1, a2]-------[a3]-
Tried various combinations of GroupByUntil
, Debounce
, Buffer
, and Timer
to no avail. How is it done?