2

I have a sequence of items, and want to group them by a key and calculate several aggregations for each key.

The number of items is large, but the number of distinct keys is small.

A toy example:

static List<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> items)
{
    return items
        .GroupBy(x => x.Key)
        .Select(g => (
            Key : g.Key,
            Sum : g.Sum(x => x.Value),
            Count : g.Count()
        ))
        .ToList();
}

Using Linq's GroupBy has the unfortunate consequence that it'll need to load all the items into memory.

An imperative implementation would only consume memory proportional to the number of distinct keys, but I'm wondering if there is a nicer solution.

Reactive Extension's "push" approach should theoretically enable low memory grouping as well, but I didn't find a way to escape from IObservable to materialize the actual values. I'm also open to other elegant solutions (besides the obvious imperative implementation).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Nik
  • 23
  • 3
  • What is "large" in this context? Some people seem to think that 1k items are many. And why do you want to use GroupBy in a context where it is inefficient? Using a dictionary to sum items should be fairly simple, not much more code, and avoid your efficiency concerns. Keep in mind, LINQ is for *convenience*, it is not a silver bullet, especially not where things like performance is a concern. – JonasH Jun 09 '22 at 14:08
  • 1
    More than I'm willing to keep in RAM, let's say a billion. CPU performance isn't a big concern, since loading the data is slower than transforming it with LINQ/Rx. I know how to write this imperatively using a dictionary, but I'd like to explore alternative solutions before deciding to go that way. – Nik Jun 09 '22 at 14:36
  • This question might be slightly relevant: [How to check an IEnumerable for multiple conditions with a single enumeration without buffering?](https://stackoverflow.com/questions/58578480/how-to-check-an-ienumerable-for-multiple-conditions-with-a-single-enumeration-wi) – Theodor Zoulias Jun 09 '22 at 17:21

2 Answers2

1

You could do this:

static IList<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return source
        .ToObservable()
        .GroupBy(x => x.Key)
        .Select(g => (
            Key: g.Key,
            Sum: g.Sum(x => x.Value).PublishLast().AutoConnect(0),
            Count: g.Count().PublishLast().AutoConnect(0)
        ))
        .ToList()
        .Wait()
        .Select(e => (e.Key, e.Sum.Wait(), e.Count.Wait()))
        .ToArray();
}
  • With the ToObservable operator, the IEnumerable<T>¹ source is converted to an IObservable<T> sequence.

  • The GroupBy converts the IObservable<T> to an IObservable<IGroupedObservable<string, T>>.

  • The Select converts each IGroupedObservable<string, T> to a (string, IObservable<decimal>, IObservable<int>). The PublishLast is used in order to remember the last (and only) value emitted by the Sum and Count operators. The AutoConnect(0) subscribes to these subsequences immediately when they are emitted.

  • The ToList converts the IObservable<T> to an IObservable<IList<T>>. The outer observable will emit a single list when it is completed.

  • The Wait waits synchronously for the outer observable to complete, and to emit the single list. This is where all the work happens. Until this point the source sequence has not been enumerated. The Wait subscribes to the observable that has been constructed so far, which triggers subscriptions to the underlying observables, and eventually triggers the enumeration of the source. All the calculations are performed synchronously during the subscriptions, on the current thread. So the verb "wait" doesn't describe accurately what's happening here.

  • The next Select converts each (string, IObservable<decimal>, IObservable<int>) to a (string, decimal, int), by waiting the subsequences. These subsequences have already completed at this point, and their single output is stored inside the PublishLast. So these inner Wait invocations are not triggering any serious work. All the heavy work has already been done on the previous step.

  • Finally the ToArray converts the IEnumerable<(string, decimal, int)> to an array of (string, decimal, int), which is the output of the GroupStats method.

¹ I am using the T as placeholder for a complex ValueTuple, so that the explanation is not overly verbose.


Update: The Rx ToObservable operator has quite a lot of overhead, because it has to support the Rx scheduling infrastructure. You can replace it with the ToObservableHypersonic below, and achieve a speed-up of around 5x:

public static IObservable<TSource> ToObservableHypersonic<TSource>(
    this IEnumerable<TSource> source)
{
    return Observable.Create<TSource>(observer =>
    {
        foreach (var item in source) observer.OnNext(item);
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

I should also mention an alternative to the PublishLast+AutoConnect(0) combination, which is to convert the subsequences to tasks with the ToTask method. It has the same effect: the subsequences are subscribed immediately and their last value is memorized.

    Sum: g.Sum(x => x.Value).ToTask(),
    Count: g.Count().ToTask()
//...
.Select(e => (e.Key, e.Sum.Result, e.Count.Result))
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • That's definitely a bit more complex than I expected, especially the `.PublishLast().AutoConnect(0)` part. – Nik Jun 10 '22 at 12:20
  • I'll accept this answer, since it's what I asked for and has a great explanation. But I'll stay with the imperative implementation for now, since it's much easier to understand and doesn't require explanation. – Nik Jun 10 '22 at 12:45
  • @Nik yep, it's definitely not a walk in the park. If you don't know in depth how the Rx library works internally, it's highly unlikely that you'll be successful at doing synchronous processing with it, which is certainly not this library's [intended purpose](http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#WhenRx)! – Theodor Zoulias Jun 10 '22 at 13:00
  • @Nik I updated the answer with an example of using the `.ToTask()` instead of the `.PublishLast().AutoConnect(0)`, and with a performance optimization for the `.ToObservable()`. – Theodor Zoulias Jun 11 '22 at 10:57
0

I wonder if this is a simpler implementation:

static IList<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return source
        .ToObservable(Scheduler.Immediate)
        .GroupBy(x => x.Key)
        .SelectMany(
            g => g.Aggregate(
                (Sum: 0m, Count: 0),
                (a, x) => (a.Sum + x.Value, a.Count + 1)), 
            (x, y) => (Key: x.Key, Sum: y.Sum, Count: y.Count)) 
        .ToList()
        .Wait();
}

Or better, a non-blocking version:

static async Task<IList<(string Key, decimal Sum, int Count)>> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return await source
        .ToObservable(Scheduler.Immediate)
        .GroupBy(x => x.Key)
        .SelectMany(
            g => g.Aggregate(
                (Sum: 0m, Count: 0),
                (a, x) => (a.Sum + x.Value, a.Count + 1)), 
            (x, y) => (Key: x.Key, Sum: y.Sum, Count: y.Count)) 
        .ToList();
}

If I run the async version with this source:

var source = new[]
{
    (Key: "a", Value: 1m),
    (Key: "c", Value: 2m),
    (Key: "b", Value: 3m),
    (Key: "b", Value: 4m),
    (Key: "c", Value: 5m),
    (Key: "c", Value: 6m),
};

var output = await GroupStats(source);

I get this output:

output

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • *`(a, x) => (a.Sum + x.Value, a.Count + 1))`* -- I think that the whole point of this exercise is to exploit the functionality of the Rx operators, instead of doing manually the math. Otherwise it would be even simpler to stay in the realm of the `IEnumerable` and the LINQ (maybe with the help of MoreLinq). Also both the `Wait` and the `await` versions are most likely doing the same thing. The calculations are performed on the current thread. Nothing really asynchronous is happening. – Theodor Zoulias Jun 11 '22 at 05:28
  • @TheodorZoulias - I thought the idea was to avoid having the entire enumerable in memory. This avoids that. Also the `SelectMany` should be doing things concurrently. And the choice of `Aggregate` was to avoid unnecessary subscriptions. – Enigmativity Jun 11 '22 at 05:31
  • @TheodorZoulias - It'd just need a `.ObserveOn(Scheduler.Default)` before the `Aggregate` to get this working concurrenctly. – Enigmativity Jun 11 '22 at 05:33
  • *"I thought the idea was to avoid having the entire enumerable in memory"* -- Oh, yes, you are right, I forgot that. The LINQ `GroupBy` operator stores everything in memory before handing the groupings for further processing. – Theodor Zoulias Jun 11 '22 at 05:37
  • 1
    If I wanted to offload the calculations to a `ThreadPool` thread, I would probably prefer to wrap the whole thing in an `await Task.Run(() => /* ... */)`. Messing with the Rx schedulers is really for the priesthood. – Theodor Zoulias Jun 11 '22 at 05:43