10

In this case just odd lines have meaningful data and there is no character that uniquely identifies those lines. My intention is to get something equivalent to the following example:

Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isOddLine())
     .map(line -> toDomainObject(line))

Is there any “clean” way to do it, without sharing global state?

Miguel Gamboa
  • 8,855
  • 7
  • 47
  • 94

5 Answers5

6

No, there's no way to do this conveniently with the API. (Basically the same reason as to why there is no easy way of having a zipWithIndex, see Is there a concise way to iterate over a stream with indices in Java 8?).

You can still use Stream, but go for an iterator:

Iterator<String> iter = Files.lines(src).iterator();
while (iter.hasNext()) {
    iter.next();                  // discard
    toDomainObject(iter.next());  // use
}

(You might want to use try-with-resource on that stream though.)

Community
  • 1
  • 1
aioobe
  • 413,195
  • 112
  • 811
  • 826
4

A clean way is to go one level deeper and implement a Spliterator. On this level you can control the iteration over the stream elements and simply iterate over two items whenever the downstream requests one item:

public class OddLines<T> extends Spliterators.AbstractSpliterator<T>
    implements Consumer<T> {

    public static <T> Stream<T> oddLines(Stream<T> source) {
        return StreamSupport.stream(new OddLines(source.spliterator()), false);
    }
    private static long odd(long l) { return l==Long.MAX_VALUE? l: (l+1)/2; }
    Spliterator<T> originalLines;

    OddLines(Spliterator<T> source) {
        super(odd(source.estimateSize()), source.characteristics());
        originalLines=source;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(originalLines==null || !originalLines.tryAdvance(action))
            return false;
        if(!originalLines.tryAdvance(this)) originalLines=null;
        return true;
    }

    @Override
    public void accept(T t) {}
}

Then you can use it like

Stream<DomainObject> res = OddLines.oddLines(Files.lines(src))
    .map(line -> toDomainObject(line));

This solution has no side effects and retains most advantages of the Stream API like the lazy evaluation. However, it should be clear that it hasn’t a useful semantics for unordered stream processing (beware about the subtle aspects like using forEachOrdered rather than forEach when performing a terminal action on all elements) and while supporting parallel processing in principle, it’s unlikely to be very efficient…

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Note that as you can see from http://stackoverflow.com/questions/43809885/why-filter-with-side-effects-performs-better-than-a-spliterator-based-implementa/ in some cases this solution will perform worse than "not-so-clean" solution that uses a stateful `filter` – SergGr May 10 '17 at 00:46
4

As aioobe said, there isn't a convenient way to do this, but there are several inconvenient ways. :-)

Here's another spliterator-based approach. Unlike Holger's, which wraps another spliterator, this one does the I/O itself. This gives greater control over things like ordering, but it also means that it has to deal with IOException and close handling. I also threw in a Predicate parameter that lets you get a crack at which lines get passed through.

static class LineSpliterator extends Spliterators.AbstractSpliterator<String>
        implements AutoCloseable {
    final BufferedReader br;
    final LongPredicate pred;
    long count = 0L;

    public LineSpliterator(Path path, LongPredicate pred) throws IOException {
        super(Long.MAX_VALUE, Spliterator.ORDERED);
        br = Files.newBufferedReader(path);
        this.pred = pred;
    }

    @Override
    public boolean tryAdvance(Consumer<? super String> action) {
        try {
            String s;
            while ((s = br.readLine()) != null) {
                if (pred.test(++count)) {
                    action.accept(s);
                    return true;
                }
            }
            return false;
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    @Override
    public void close() {
        try {
            br.close();
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    public static Stream<String> lines(Path path, LongPredicate pred) throws IOException {
        LineSpliterator ls = new LineSpliterator(path, pred);
        return StreamSupport.stream(ls, false)
                            .onClose(() -> ls.close());
    }
}

You'd use it within a try-with-resources to ensure that the file is closed, even if an exception occurs:

static void printOddLines() throws IOException {
    try (Stream<String> lines = LineSpliterator.lines(PATH, x -> (x & 1L) == 1L)) {
        lines.forEach(System.out::println);
    }
}
Community
  • 1
  • 1
Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
  • Does this approach works when turning the stream in parallel? (well I have my answer, I tested it and it prints some lines multiple times, I'm still surprised it works due to the shared `count` variable :/). Do you think its possible to do it so that it works in parallel computations? I should say no because the requested task is by definition sequential (unless you know the number of lines of the file by advance), but I'm not sure... – user2336315 May 11 '15 at 17:29
  • 1
    @user2336315 It would have worked if I had implemented it correctly. :-) Fixed. Sorry, I should have tested more thoroughly. The requirement on `tryAdvance` is that it either (a) call action zero times and return false, or (b) call action once and return true. The previous code had cases where the action was called zero times but it returned true, which broke things. This was a simple programming error, not a concurrency issue. The spliterator implementation itself need not be thread-safe. – Stuart Marks May 11 '15 at 21:06
  • So how the `count` variable is dispatched when the task is run in parallel? Does `trySplit` knows how many elements it splits and update it accordingly? I mean I have a really hard understanding of how it works when you have a stateful condition :-( – user2336315 May 11 '15 at 21:18
  • 1
    @user2336315 `AbstractSpliterator` subclasses are called by one thread at a time, and a simplistic policy is used for parallelism. Basically `trySplit` calls `tryAdvance` a bunch of times to gather elements into a batch, and then hands off that batch as the result of splitting. That batch is contained in a different spliterator object that's processed by other thread(s). The remaining elements remain in *this* spliterator and can be split this way again. See the source code at http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/jdk8-b132/src/share/classes/java/util/Spliterators.java#l1289 – Stuart Marks May 11 '15 at 22:10
  • 1
    @Stuart Marks: …and it’s worth noting that, since the stream has an unknown size, splitting will use a rather big chunk size (afaik, starting with `1024`, then increasing) and due to the filtering within `tryAdvance`, it filters single-threaded and counts the matching lines only when buffering for splitting. So you need quite large files for having any effect with `parallel`, besides slowing everything down. By the way, you can specify `NONNULL` characteristic. Maybe there’s an implementation taking advantage of it in the future… – Holger May 12 '15 at 08:06
1

You can do this with a custom spliterator:

public class EvenOdd {
    public static final class EvenSpliterator<T> implements Spliterator<T> {
        private final Spliterator<T> underlying;
        boolean even;

        public EvenSpliterator(Spliterator<T> underlying, boolean even) {
            this.underlying = underlying;
            this.even = even;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (even) {
                even = false;

                return underlying.tryAdvance(action);
            }
            if (!underlying.tryAdvance(t -> {})) {
                return false;
            }
            return underlying.tryAdvance(action);
        }

        @Override
        public Spliterator<T> trySplit() {
            if (!hasCharacteristics(SUBSIZED)) {
                return null;
            }
            final Spliterator<T> newUnderlying = underlying.trySplit();
            if (newUnderlying == null) {
                return null;
            }
            final boolean oldEven = even;

            if ((newUnderlying.estimateSize() & 1) == 1) {
                even = !even;
            }

            return new EvenSpliterator<>(newUnderlying, oldEven);
        }

        @Override
        public long estimateSize() {
            return underlying.estimateSize()>>1;
        }

        @Override
        public int characteristics() {
            return underlying.characteristics();
        }
    }

    public static void main(String[] args) {

        final EvenSpliterator<Integer> spliterator = new EvenSpliterator<>(IntStream.range(1, 100000).parallel().mapToObj(Integer::valueOf).spliterator(), false);
        final List<Integer> result = StreamSupport.stream(spliterator, true).parallel().collect(Collectors.toList());
        final List<Integer> expected = IntStream.range(1, 100000 / 2).mapToObj(i -> i * 2).collect(Collectors.toList());
        if (result.equals(expected)) {
            System.out.println("Yay! Expected result.");
        }
    }
}
Daniel
  • 4,481
  • 14
  • 34
  • While this code snippet may solve the question, [including an explanation](http://meta.stackexchange.com/questions/114762/explaining-entirely-code-based-answers) really helps to improve the quality of your post. Remember that you are answering the question for readers in the future, and those people might not know the reasons for your code suggestion. – Ferrybig Feb 12 '16 at 13:00
0

Following the @aioobe algorithm, here's another spliterator-based approach, as proposed by @Holger but more concise, even if less effective.

public static <T> Stream<T> filterOdd(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

Then you can use it like

Stream<DomainObject> res = Files.lines(src)
filterOdd(res)
 .map(line -> toDomainObject(line))
Community
  • 1
  • 1
Miguel Gamboa
  • 8,855
  • 7
  • 47
  • 94