7

Regarding the question How to skip even lines of a Stream obtained from the Files.lines I followed the accepted answer approach implementing my own filterEven() method based on Spliterator<T> interface, e.g.:

public static <T> Stream<T> filterEven(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

which I can use in the following way:

Stream<DomainObject> res = Files.lines(src)
filterEven(res)
     .map(line -> toDomainObject(line))

However measuring the performance of this approach against the next one which uses a filter() with side effects I noticed that the next one performs better:

final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isEvenLine ())
     .map(line -> toDomainObject(line))

I tested the performance with JMH and I am not including the file load in the benchmark. I previously load it into an array. Then each benchmark starts by creating a Stream<String> from previous array, then filtering even lines, then applying a mapToInt() to extract the value of an int field and finally a max() operation. Here it is one of the benchmarks (you can check the whole Program here and here you have the data file with about 186 lines):

@Benchmark
public int maxTempFilterEven(DataSource src){
    Stream<String> content = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1);                       // Skip line: Not available
    return filterEven(content)              // Filter daily info and skip hourly
            .mapToInt(line -> parseInt(line.substring(14, 16)))
            .max()
            .getAsInt();
}

I am not getting why the filter() approach has better performance (~80ops/ms) than the filterEven() (~50ops/ms)?

Community
  • 1
  • 1
Miguel Gamboa
  • 8,855
  • 7
  • 47
  • 94
  • 1
    You are not doing anything. Without specifying a terminal operation, this is a pointless examination. – Holger May 05 '17 at 16:56
  • @Holger sorry you're right. After the map() I am applying a mapToInt() to extract the value of an int property and finally a max() operation. – Miguel Gamboa May 05 '17 at 17:06
  • 1
    Maybe posting the complete benchmark helps here. It’s also crucial to specify details about the file you’re reading, i.e. size / number of lines. It’s hard to believe that these small details of filtering outweigh the actual reading of lines from a file. You should add a third test case which does the same operation without filtering. – Holger May 05 '17 at 17:12
  • 1
    I am not including the file load in the benchmark. I previously load it into an array. Then each benchmark starts by creating a Stream from this array. So I am just evaluating the map(), filter() and the max() without the weight of reading the file. The file has around 90 lines. – Miguel Gamboa May 05 '17 at 17:34
  • @Holger I updated OP with all details about the benchmark and a link to the source file. Thank you – Miguel Gamboa May 05 '17 at 19:00

1 Answers1

4

Intro

I think I know the reason but unfortunately I have no idea how to improve performance of Spliterator-based solution (at least without rewritting of the whole Streams API feature).

Sidenote 1: performance was not the most important design goal when Stream API was designed. If performance is critical, most probably re-writting the code without Stream API will make the code faster. (For example, Stream API unavoidably increases memory allocation and thus GC-pressure). On the other hand in most of the scenarios Stream API provides a nicer higher-level API at a cost of a relatively small performance degradation.

Part 1 or Short theoretical answer

Stream is designed to implement a kind of internal iteration as the main mean of consuming and external iteration (i.e. Spliterator-based) is an additional mean that is kind of "emulated". Thus external iteration involves some overhead. Laziness adds some limits to the efficiency of external iteration and a need to support flatMap makes it necessary to use some kind of dynamic buffer in this process.

Sidenote 2 In some cases Spliterator-based iteration might be as fast as the internal iteration (i.e. filter in this case). Particularly it is so in the cases when you create a Spliterator directly from that data-containing Stream. To see it, you can modify your tests to materialize your first filter into a Strings array:

String[] filteredData = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1)  
            .toArray(String[]::new);

and then compare preformance of maxTempFilter and maxTempFilterEven modified to accept that pre-filtered String[] filteredData. If you want to know why this is so, you probably should read the rest of this long answer or at least Part 2.

Part 2 or Longer theoretical answer:

Streams were designed to be mainly consumed as a whole by some terminal operation. Iterating elements one by one although supported is not designed as a main way to consume streams.

Note that using the "functional" Stream API such as map, flatMap, filter, reduce, and collect you can't say at some step "I have had enough data, stop iterating over the source and pushing values". You can discard some incoming data (as filter does) but can't stop iteration. (take and skip transformations are actually implemented using Spliterator inside; and anyMatch, allMatch, noneMatch, findFirst, findAny, etc. use non-public API j.u.s.Sink.cancellationRequested, also they are easier as there can't be several terminal operations). If all transformations in the pipeline are synchronous, you can combine them into a single aggregated function (Consumer) and call it in a simple loop (optionally splitting the loop execution over several thread). This is what my simplified version of the state based filter represents (see the code in the Show me some code section). It gets a bit more complicated if there is a flatMap in the pipeline but idea is still the same.

Spliterator-based transformation is fundamentally different because it adds an asynchronous consumer-driven step to the pipeline. Now the Spliterator rather than the source Stream drives the iteration process. If you ask for a Spliterator directly on the source Stream, it might be able to return you some implementation that just iterates over its internal data structure and this is why materializing pre-filtered data should remove performance difference. However, if you create a Spliterator for some non-empty pipeline, there is no other (simple) choice other than asking the source to push elements one by one through the pipeline until some element passes all the filters (see also second example in the Show me some code section). The fact that source elements are pushed one by one rather than in some batches is a consequence of the fundamental decision to make Streams lazy. The need for a buffer instead of just one element is the consequence of support for flatMap: pushing one element from the source can produce many elements for Spliterator.

Part 3 or Show me some code

This part tries to provide some backing with the code (both links to the real code and simulated code) of what was described in the "theoretical" parts.

First of all, you should know that current Streams API implementation accumulates non-terminal (intermediate) operations into a single lazy pipeline (see j.u.s.AbstractPipeline and its children such as j.u.s.ReferencePipeline. Then, when the terminal operation is applied, all the elements from the original Stream are "pushed" through the pipeline.

What you see is the result of two things:

  1. the fact that streams pipelines are different for cases when you have a Spliterator-based step inside.
  2. the fact that your OddLines is not the first step in the pipeline

The code with a stateful filter is more or less similar to the following straightforward code:

static int similarToFilter(String[] data)
{
    final int[] counter = {0};
    final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
    int skip = 1;

    boolean reduceEmpty = true;
    int reduceState = 0;
    for (String outerEl : data)
    {
        if (outerEl.charAt(0) != '#')
        {
            if (skip > 0)
                skip--;
            else
            {
                if (isEvenLine.test(outerEl))
                {
                    int intEl = parseInt(outerEl.substring(14, 16));
                    if (reduceEmpty)
                    {
                        reduceState = intEl;
                        reduceEmpty = false;
                    }
                    else
                    {
                        reduceState = Math.max(reduceState, intEl);
                    }
                }
            }
        }
    }
    return reduceState;
}

Note that this is effectively a single loop with some calculations (filtering/transformations) inside.

When you add a Spliterator into the pipeline on the other hand, things change significantly and even with simplifications code that is reasonably similar to what actually happens becomes much larger such as:

interface Sp<T>
{
    public boolean tryAdvance(Consumer<? super T> action);
}

static class ArraySp<T> implements Sp<T>
{
    private final T[] array;
    private int pos;

    public ArraySp(T[] array)
    {
        this.array = array;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (pos < array.length)
        {
            action.accept(array[pos]);
            pos++;
            return true;
        }
        else
        {
            return false;
        }
    }
}

static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
    private final Sp<T> sourceSp;
    private final Predicate<T> filter;

    private final ArrayList<T> buffer = new ArrayList<T>();
    private int pos;


    public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
    {
        this.sourceSp = sourceSp;
        this.filter = filter;
    }

    @Override
    public void accept(T t)
    {
        buffer.add(t);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        while (true)
        {
            if (pos >= buffer.size())
            {
                pos = 0;
                buffer.clear();
                sourceSp.tryAdvance(this);
            }
            // failed to fill buffer
            if (buffer.size() == 0)
                return false;

            T nextElem = buffer.get(pos);
            pos++;
            if (filter.test(nextElem))
            {
                action.accept(nextElem);
                return true;
            }
        }
    }
}

static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
    private Sp<T> sourceSp;

    public OddLineSp(Sp<T> sourceSp)
    {
        this.sourceSp = sourceSp;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (sourceSp == null)
            return false;

        sourceSp.tryAdvance(this);
        if (!sourceSp.tryAdvance(action))
        {
            sourceSp = null;
        }
        return true;

    }

    @Override
    public void accept(T t)
    {

    }
}

static class ReduceIntMax
{
    boolean reduceEmpty = true;
    int reduceState = 0;

    public int getReduceState()
    {
        return reduceState;
    }

    public void accept(int t)
    {
        if (reduceEmpty)
        {
            reduceEmpty = false;
            reduceState = t;
        }
        else
        {
            reduceState = Math.max(reduceState, t);
        }
    }
}


static int similarToSpliterator(String[] data)
{
    ArraySp<String> src = new ArraySp<>(data);

    int[] skip = new int[1];
    skip[0] = 1;
    WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
    {
        if (s.charAt(0) == '#')
            return false;
        if (skip[0] != 0)
        {
            skip[0]--;
            return false;
        }
        return true;
    });
    OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
    final ReduceIntMax reduceIntMax = new ReduceIntMax();
    while (oddLines.tryAdvance(s ->
                               {
                                   int intValue = parseInt(s.substring(14, 16));
                                   reduceIntMax.accept(intValue);
                               })) ; // do nothing in the loop body
    return reduceIntMax.getReduceState();
}

This code is larger because the logic is impossible (or at least very hard) to represent without some non-trivial stateful callbacks inside the loop. Here interface Sp is a mix of j.u.s.Stream and j.u.Spliterator interfaces.

  • Class ArraySp represents a result of Arrays.stream.

  • Class WrappingSp is similar to j.u.s.StreamSpliterators.WrappingSpliterator which in the real code represents an implementation of Spliterator interface for any non-empty pipeline i.e. a Stream with at least one intermediate operation applied to it (see j.u.s.AbstractPipeline.spliterator method). In my code I merged it with a StatelessOp subclass and put there logic responsible for filter method implementation. Also for simplcity I implemented skip using filter.

  • OddLineSp corresponds to your OddLines and its resulting Stream

  • ReduceIntMax represents ReduceOps terminal operation for Math.max for int

So what's important in this example? The important thing here is that since you first filter you original stream, your OddLineSp is created from a non-empty pipeline i.e. from a WrappingSp. And if you take a closer look at WrappingSp, you'll notice that every time tryAdvance is called, it delegates the call to the sourceSp and accumulates that result(s) into a buffer. Moreover, since you have no flatMap in the pipeline, elements to the buffer will be copied one by one. I.e. every time WrappingSp.tryAdvance is called, it will call ArraySp.tryAdvance, get back exactly one element (via callback), and pass it further to the consumer provided by the caller (unless the element doesn't match the filter in which case ArraySp.tryAdvance will be called again and again but still the buffer is never filled with more than one element at a time).

Sidenote 3: If you want to look at the real code, the most intersting places are j.u.s.StreamSpliterators.WrappingSpliterator.tryAdvance which calls j.u.s.StreamSpliterators.AbstractWrappingSpliterator.doAdvance which in turn calls j.u.s.StreamSpliterators.AbstractWrappingSpliterator.fillBuffer which in turn calls pusher that is initialized at j.u.s.StreamSpliterators.WrappingSpliterator.initPartialTraversalState

So the main thing that's hurting performance is this copying into the buffer. Unfortunately for us, usual Java developers, current implementation of the Stream API is pretty much closed and you can't modify only some aspects of the internal behavior using inheritance or composition. You may use some reflection-based hacking to make copying-to-buffer more efficient for your specific case and gain some performance (but sacrifice laziness of the Stream) but you can't avoid this copying altogether and thus Spliterator-based code will be slower anyway.

Going back to the example from the Sidenote #2, Spliterator-based test with materialized filteredData works faster because there is no WrappingSp in the pipeline before OddLineSp and thus there will be no copying into an intermediate buffer.

SergGr
  • 23,570
  • 2
  • 30
  • 51
  • Thanks for your detailed explanation. So, if I understood the Spliterator based Stream accumulates the items into a buffer depending on weather it is not the first operation of the pipeline, or is. What is that buffer for? – Miguel Gamboa May 08 '17 at 12:12
  • @MiguelGamboa, take a look at my update. Hopefully it clarifies a bit why there is a need for a buffer. – SergGr May 08 '17 at 19:37
  • Your explanation males all sense. If you allow me the suggestion, I would ask you to put at the beginning of your answer, a short answer with that part regarding your `filteredData` example and your suggestion saying: "you can modify your tests to materialize your first filter into a String array. In such case you should get almost identical performance because your OddLines will be the first step in the pipeline and there will be no copying into an intermediate buffer." This way we can immediately check Why Spliterator based implementation performs worse. – Miguel Gamboa May 09 '17 at 10:33
  • @MiguelGamboa, I tried to re-format my answser to move short theoretical part higher. Please, let me know if you find some inconsistencies introduced by this edit. If you think this answer is OK, please accept it. – SergGr May 10 '17 at 00:32
  • Is the internal buffer also used when the `StreamSpliterators.WrappingSpliterator` is traversed in bulk through the `forEachRemaining()` ? I see that it calls [ph.wrapAndCopyInto(...)](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/stream/StreamSpliterators.java#312) but i cannot figure out what is it doing? I think when `buffer` is not `null` then the `forEachRemaining()` performs `do { } while (tryAdvance(consumer));` which in turn redirects to the behavior described in your answer. But, I cannot understand when this buffer is null, or not? – Miguel Gamboa Oct 27 '17 at 15:32
  • @MiguelGamboa, first of all asking a question as a comment half a year after the discussion is a **_bad_** plan. I believe it makes sense to ask a new question, probably referencing the old one, so more people would see it and have a chance to answer. As for the question, I'm not sure I understand it. If you still put your logic in a `Spliterator` which has `stream.spliterator()` as its datasource, everything remains the same: if the source `stream` is not the root, it has to create a `WrappingSpliterator` which has to use the buffer to feed **_your_** `tryAdvance`. – SergGr Oct 27 '17 at 22:35
  • I followed your suggestion and I made a more elaborated post with my doubts. In truth, they are most related with the need of the buffer, which I understood that was most related with the `flatMap` issues. Here it is my question [Why the tryAdvance of stream.spliterator() may accumulate items into a buffer?](https://stackoverflow.com/q/47036993/1140754) for the case you would like to add any useful information. – Miguel Gamboa Nov 02 '17 at 10:39
  • @MiguelGamboa, I like the answer Holger provided in that question. I also put a different perspective in my answer. Hope this helps. – SergGr Nov 02 '17 at 18:25