7

Is the following lambda possible somehow in Java? I'd like to count elements from my filtered stream but collaterally store the first 10

stream().filter(myFilter)  //Reduces input to forthcoming operations
        .limit(10)         //Limits to ten the amount of elements to finish stream 
        .peek(myList::add) //Stores the ten elements into a list
        .count();          //Here is the difficult one. Id like to count everything  the total of elements that pass the filter, beyond the 10 I am fetching

EDIT: It was too implicit from my side, but the idea is meant of course as a potential solution which would be the fastest (faster than calling twice the stream generator and do both operations separately at least):

List<Entity> entities = stream().filter(myFilter) 
                                .limit(10)
                                .collect(Collectors.toList());
long entitiesCount = stream().filter(myFilter) 
                             .count();

... taking profit of a single iteration, and without having to load the whole collection on memory. I'm doing tests with parallelization of the answers

Whimusical
  • 6,401
  • 11
  • 62
  • 105
  • You could do it using 2 separate streams. – Nicholas K Oct 09 '18 at 17:34
  • It's an stream because it is a very expensive (long) iteratation.... this is the sole point of such a crazy idea of mine – Whimusical Oct 09 '18 at 17:40
  • If the stream is large, the best way to process it is to see if it can be parallelized. If this has to be a sequential operation, as pointed here and [here](https://stackoverflow.com/questions/33635717/in-java-streams-is-peek-really-only-for-debugging), maybe it is better to avoid playing games with the API and go with a good old `.forEach()` – jrook Oct 09 '18 at 18:35
  • Maybe streams are not the best tool for your requirement. Which is the source of the elements? – fps Oct 09 '18 at 20:30
  • 1
    @FedericoPeraltaSchaffner I think a custom collector would be answer in such a case, not matter what the source is... – Eugene Oct 09 '18 at 21:25
  • @Eugene sure, but maybe just using for each and some longadder... – fps Oct 09 '18 at 21:51
  • 1
    @FedericoPeraltaSchaffner of course, but that bares lots of volatiles writes and reads in that count, yhis has to be measured for which would be a winner... Of course, I would go with a collector – Eugene Oct 09 '18 at 21:53
  • @jrook With foreach I am not sure Id benefit from internal optimizations, maybe I need to discard 20000 objects for mere counting – Whimusical Oct 10 '18 at 09:04
  • I need to test them all to know the most performant – Whimusical Oct 10 '18 at 11:00

4 Answers4

5

A custom collector is the answer here:

Entry<List<Integer>, Integer> result = list.stream()
            .collect(Collector.of(
                    () -> new SimpleEntry<>(new ArrayList<>(), 0),
                    (l, x) -> {
                        if (l.getKey().size() < 10) {
                            l.getKey().add(x);
                        }
                        l.setValue(l.getValue() + 1);
                    },
                    (left, right) -> {
                        List<Integer> leftList = left.getKey();
                        List<Integer> rightList = right.getKey();
                        while (leftList.size() < 10 && rightList.size() > 0) {
                            leftList.add(rightList.remove(0));
                        }
                        left.setValue(left.getValue() + right.getValue());
                        return left;
                    }));

Suppose you have this code:

Set.of(1, 2, 3, 4)
            .stream()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));

Before we start thinking about that code (it does matter how correct it is for the purpose of the example), we need to read the documentation a bit for CONCURRENT characteristic:

If a CONCURRENT collector is not also UNORDERED, then it should only be evaluated concurrently if applied to an unordered data source.

What this documentation basically says is that if your collector is CONCURRENT and the source of the stream is UNORDERED (like a Set) or we explicitly call unordered then the merger will never get called.

If you run the previous code, you will see that Combiner called is never present in the output.

If you change the Set.of(1, 2, 3, 4) to List.of(1, 2, 3, 4) you will see a different picture (ignore the correctness of the result you get - since ArrayList is not thread safe, but that is not the point). If you have the source of the stream to be a List and at the same time you call unordered you will again see that only the accumulator is called, that is:

 List.of(1, 2, 3, 4)
            .stream()
            .unordered()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • What's the point of leftList.add(rightList.remove(0));? Isn't this just adding the first element on te right list? Can this be safely parallelized? If not, what is the point of the complex accumulator part ? If I use a Collections.synchronizedList and add Collector.Characteristics CONCURRENT as the characteristics of the collector, will I benefit in someway when using parallelilzation (as I think that being ordered it cannot)? – Whimusical Oct 13 '18 at 12:48
  • 1
    @Whimusical you are asking quite a lot of questions into this comment, I'll try to address them. This collector , as is, does not add the `CONCURRENT` characteristic, and should not either - as it changes the way a collector works entirely, see [here](https://stackoverflow.com/questions/40888262/what-is-the-difference-between-collectors-toconcurrentmap-and-converting-a-map-t/40888456#40888456). *Isn't this just adding the first element on the right list* - it's adding to the *left* List and that is a quite important difference. – Eugene Oct 14 '18 at 13:26
  • @Whimusical *Can this be safely parallelized* - yes. *If I use a Collections.synchronizedList* - where exactly will you use it? As a source of your stream? if so, this does not matter, what matters what you do in the collector logic. For your last sentence read that link I have above – Eugene Oct 14 '18 at 13:30
  • Sorry, I missed the loop for the rightList add part. This is my favourite answer so far, but I find the combiner from next answer marginally more elegant. If we agreed that combiner is indeed not needed (can be provided as (a,b) -> a), ill accept it. The point is, if parallel is never gonna collect in a simultanious way, due to the stream being ordered and the collector missing the feature CONCURRENT, I understand these 7 combiner lines are just unneeded, are them? – Whimusical Oct 19 '18 at 18:15
  • As I understand it, if the collector is not designed for concurrency (and in my case stream is ordered and feature CONCURRENT is not set), combiner is never executed (my tests so far prove that). So there's no point on specifying one even if you are allowed to run in parallel the lambda – Whimusical Oct 19 '18 at 20:28
  • I am not asserting, just trying to validate my assumptions :p – Whimusical Oct 19 '18 at 20:28
  • I really appreciate your time, and I dont want to make you loose more, so Ill accept the answer, but I am almost sure executing it in .parallel() is not combining either if theres no UNORDERED an CONCURRENT. THanks for your help!!! – Whimusical Oct 19 '18 at 20:37
  • 1
    @Whimusical I decided to extend the answer a bit and delete my previous comments to may be make it more clear – Eugene Oct 20 '18 at 20:10
  • Amazing. I assumed order came from implementation of the collection, never thought of the possibility and existance of .unordered() – Whimusical Oct 22 '18 at 16:11
  • 1
    @Whimusical be vary careful with that call though, it only sets an internal flag - it does not shuffle the elements internally - at least at the moment – Eugene Oct 22 '18 at 16:57
3

The following uses a mutable reduction with the help of a local class holding the summary.
The limit on the collected elements is done by just picking the first 10 elements in the combiner function.

Example using IntStream:

Stat result = IntStream.range(0, 100)
        .boxed()
        .filter(i -> i % 2 == 0)
        .collect(() -> new Stat(0, new ArrayList<Integer>()), 
            (stat, integer) -> {
                stat.count++;
                if (stat.list.size() < 10) {
                    stat.list.add(integer);
                }
            }, 
            (stat1, stat2) -> {
                stat1.list.addAll(stat2.list.subList(0, Math.min(stat2.list.size(), 
                    10 - stat1.list.size())));
            });

And here's the Stat class used in the stream (you can easily use something like Pair<Long, List<Integer>>):

private static class Stat {
    long count;
    List<Integer> list;

    public Stat(long count, List<Integer> list) {
        this.count = count;
        this.list = list;
    }
}

The above example results in [count=50,list=[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]]

ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • @Holger Thank you. I was rather concerned about the order of collected elements; but after testing your version, I think that concern was possibly unfounded (I've got some reading to do...) – ernest_k Oct 10 '18 at 06:43
  • @Holger So can this solution be parallelized with your points? – Whimusical Oct 10 '18 at 13:26
  • @Whimusical every solution here can be run in parallel, but that does not make them correct, especially those that rely on side effects – Eugene Oct 10 '18 at 17:20
  • @Whimusical since the relevant point has been edited into the answer already, it works correctly when used with a parallel stream. Removing the `boxed()` would improve the performance, but the solution is correct, with or without it. – Holger Oct 10 '18 at 17:27
  • @Holger I mean in the sense of being order-safe. If I use a Collections.synchronizedList and add Collector.Characteristics CONCURRENT as the characteristics of the collector, will I benefit in someway when using parallelilzation (as I think that being ordered it cannot)? – Whimusical Oct 13 '18 at 12:51
  • @Whimusical the collector of this answer supports parallelism. There is no need to modify it. Just use `parallel()` when you need it. A `CONCURRENT` collector only makes a difference, if the operation is unordered, *in general*. Perhaps, [this answer](https://stackoverflow.com/a/41045442/2711488) helps understanding. – Holger Oct 14 '18 at 13:49
  • @Holger Then the accumulator is pointless, as parallel is never gonna collect into 2 different streams, since my stream is ordered and CONCURRENT feature is not present – Whimusical Oct 19 '18 at 18:10
  • @Holger Apologies, I meant combiner, not accumulator, in my previous comment – Whimusical Oct 19 '18 at 18:27
  • @Whimusical you have it backwards, you *need* the combiner. Only when a collector is concurrent **and** the operation is unordered (either by stream or by collector), all threads will accumulate into the same container. Otherwise, each thread will accumulate into its local container and the results will get combined. – Holger Oct 22 '18 at 10:10
2

Here's a simple lambda expression that will count add to your list any items, up to 10, that make it past your filter:

i -> {if (myList.size() < 10) myList.add(i);}

But you can't simply use count() on Stream:

An implementation may choose to not execute the stream pipeline (either sequentially or in parallel) if it is capable of computing the count directly from the stream source. In such cases no source elements will be traversed and no intermediate operations will be evaluated. Behavioral parameters with side-effects, which are strongly discouraged except for harmless cases such as debugging, may be affected.

For my case, using count(), peek() was not called because the elements were not traversed, and my list was empty.

Choose a simple reduction to count the elements.

.reduce(0, (a, b) -> a + 1);

My code:

int count = yourCollection.stream()
    .filter(myFilter)
    .peek(i -> {if (myList.size() < 10) myList.add(i);} )
    .reduce(0, (a, b) -> a + 1);
rgettman
  • 176,041
  • 30
  • 275
  • 357
  • The reduce operation requires the type of my streamed entity for both the identity and the return case. If I am not wrong this is not useful for my case unfortunately, as I am not streaming integers. I can add a .map(t -> 1) before, but I am not sure this approach can be optimized with parallelation in any way – Whimusical Oct 13 '18 at 12:18
  • 1
    @Whimusical you could change the terminal operation to `.reduce(0, (a, b) -> a + 1, Integer::sum)`, to make it work, which is slightly more efficient than a preceding `map` step (unless you use `mapToInt(x -> 1).sum()`). However, while the reduction is correct and would work in parallel, the `peek` is not. Even if `myList` refers to a thread safe collection, `if(myList.size() < 10) myList.add(i);` is a victim of the “Check-then-Act” anti-pattern. After checking the condition, there is no guaranty that it still holds when the subsequent conditional operation has been entered. – Holger Oct 22 '18 at 12:03
2

This is another solution, not sure if it fits your requirement.

    final Count c = new Count();

    coll.stream().forEach(e -> {
        c.setTotCount(c.getTotCount() + 1);

        if (/*your filter*/) {
           // add till 10 elements only
           if (c.getMyList().size() <= 10) {
              c.addMyList(e);
           }
        }
    });

And the helper class is defined

class Count {
    int totCount;
    // Student for an example
    List<Student> myList = new ArrayList<>();

    public List<Student> getMyList() {
        return myList;
    }

    public void addMyList(Student std) {
        this.myList.add(std);
    }

    // getter and setter for totCount
}

Now you have your list as well as the total count, which are all stored in the helper object c. Fetch the total count the list using :

  System.out.println(c.getTotCount());
  System.out.println(c.getMyList().size());
Nicholas K
  • 15,148
  • 7
  • 31
  • 57