8

I am looking for a way to implement a non-terminal grouping operation, such that the memory overhead will be minimal.

For example, consider distinct(). In the general case, it has no choice but to collect all distinct items, and only then stream them forward. However, if we know that the input stream is already sorted, the operation could be done "on-the-fly", using minimal memory.

I know I can achieve this for iterators using an iterator wrapper and implementing the grouping logic myself. Is there a simpler way to implement this using streams API instead?

--EDIT--

I found a way to abuse Stream.flatMap(..) to achieve this:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

And then:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

Which prints:

1
3
4
5

With some changes, the same technique can be used for any kind of memory-efficient sequence grouping of streams. Anyway, I don't like much this solution, and I was looking for something more natural (like the way mapping or filtering work for example). Furthermore, I'm breaking the contract here because the function supplied to flatMap(..) is stateful.

Eyal Schneider
  • 22,166
  • 5
  • 47
  • 78
  • 2
    You can always `.filter(someSet::add)`, but have you tried and compared the performance of such a solution with a plain `distinct()`? Also, you say "in the general case", but it may be that there is an optimized implementation in the event that the `Stream` _is_ `ORDERED`, precisely (or more accurately, its underlying `Spliterator`) – fge Apr 12 '15 at 10:06
  • @fge: I'm not sure there's any optimization there. The code: IntStream.range(0, 100000000).distinct().forEach(x->{}); Runs out of memory, despite the fact that the underlying Spliterator reports itself to be ORDERED. – Eyal Schneider Apr 12 '15 at 11:01
  • 1
    Have you tried with `.forEachOrdered()`? – fge Apr 12 '15 at 12:23
  • There are both `DISTINCT` and `SORTED`. But - looking at the jdk8 code - the IntStream implementation does not make use of either for `.distinct()`. Reference-based streams otoh seem to. – the8472 Apr 12 '15 at 12:40
  • @the8472: I guess the important characteristic is SORTED, and the generated stream in my example has it. – Eyal Schneider Apr 12 '15 at 12:43
  • yeah, sorry, that's what i meant. the point remains. IntStream (rather, its implementation `IntPipeline`) does not make use of either. – the8472 Apr 12 '15 at 12:45
  • By "grouping" do you mean classifying via an function (like `Collector.groupingBy`) or aggregated with neighbors in encounter order? – Stuart Marks Apr 12 '15 at 16:46
  • @StuartMarks: I would like to aggregate chunks of consecutive items according to some logic, and then output one item per chunk. Using collect() with Collectors.groupingBy() can solve many such scenarios, but it's a terminal operation, so it computes everything and only then returns a result. This isn't memory efficient. – Eyal Schneider Apr 12 '15 at 17:35
  • Thanks, so the consecutive nature of grouping is what you're after. `groupingBy` is probably out because it will classify equivalent but non-consecutive elements together. (I'm trying to pin down semantics before considering efficiency.) Are elements of your incoming stream addressable by index, like an ArrayList? – Stuart Marks Apr 12 '15 at 20:11
  • Also, you're correct to suspect weaknesses in your `DedupSeq` approach. Statefulness per se is not the issue. It is however thread unsafe and order dependent, which are indeed issues. It may be possible to make an operation stateful but also thread-safe and order independent. Let me think.... – Stuart Marks Apr 12 '15 at 20:15
  • See also this (self-answered) question: http://stackoverflow.com/q/28363323/1441122 – Stuart Marks Apr 12 '15 at 20:49
  • @StuartMarks: Thanks for your insights. The linked you supplied deals exactly with my problem, and provides a generic solution that fits my requirements (Also - you have a really nice blog!). – Eyal Schneider Apr 13 '15 at 15:06

2 Answers2

4

If you want a solution that doesn’t add mutable state to a function that isn’t supposed to have it, you may resort to collect:

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

This works as it is the intended way of using mutable containers, however, it can’t work in parallel as splitting at arbitrary stream positions implies the possibility to encounter a value in two (or even more) threads.

If you want a general purpose IntStream rather than a forEach action, a Spliterator low level solution is preferred, despite the added complexity.

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

It even inherits a parallel support though it works by prefetching some values before processing them in another thread so it won’t accelerate the distinct operation, but maybe follow-up operations, if there are computation intense ones.


For completion, here is a distinct operation for arbitrary, i.e. unsorted, IntStreams which doesn’t rely on “boxing plus HashMap” thus may have a much better memory footprint:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

It works for positive int values only; expanding it to the full 32 bit range would require two BitSets thus not look as concise, but often the use case allows limiting the storage to the 31 bit range or even lower…

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thanks. I see now that a custom Spliterator is the way to do it (just like in stackoverflow.com/q/28363323/1441122, suggested by **Stuart Marks**). The bitset solution at the end is elegant, by the way (though still O(n) in memory usage). – Eyal Schneider Apr 13 '15 at 15:15
1

The way to do this properly would be to turn the stream into a spliterator, then wrap it depending on the properties of the returned spliterator

  • performs naive deduplication using a concurrent set if the source is neither sorted nor distinct
  • performs optimized optimized dedpulication if the source spliterator is sorted.
    supporting trySplit operations will be tricky as it may have to advance the sub-spliterator a few steps until it can be sure it's not seeing the tail of a run of non-distinct elements.
  • just returns the spliterator as-is if the source is already distinct

Once you have that spliterator you can turn it back into a stream with the same properties and continue to do stream operations on it

Since we can't modify existing jdk-interfaces the helper API would have to look more like this: dedup(IntStream.of(...).map(...)).collect(...).


If you inspect the source of java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>) you will notice that the JDK more or less does that for reference-based streams.

It is just the IntStream implementation (java.util.stream.IntPipeline.distinct()) that takes an inefficient approach that does not take advantage of of DISTINCT or SORTED.

It just blindly converts an IntStream to a boxed Integer stream and uses the reference-based deduplication without passing along the appropriate flags that would make it memory-efficient.

If this isn't already fixed in jdk9 it might be worth a bug since it's essentially unnecessary memory consumption and wasted optimization potential for the stream ops if they needlessly discard stream-flags.

the8472
  • 40,999
  • 5
  • 70
  • 122