1

I would like to figure out a simple implementation equivalent to Java 8 Stream that would allow me to explore the development of query algorithms lazily computed (such as map(), filter(), reduce(), etc). NOTE: It is not my goal to achieve a better solution than Stream. On the other hand my only goal is to understand Stream internals.

Yet, every implementation that I found is based on the Iterable<T>, such as the solutions presented in the following answers:

Yet, I do not feel comfortable with any of these solutions because:

  1. they are too verbose.
  2. they are not flexible for new query methods. The inclusion of new query methods requires structural modifications.
  3. Despite the query arguments, they do not take any advantage of new Java 8 features such as: first class functions or default methods.
  4. none of them use the Spliterator<T> approach that is used on Stream<T>.

I know that Spliterator<T> was designed to allow partitioning and parallel processing, but I think that its unique iterator method (boolean tryAdvance(Consumer<t>)) could be exploited to new alternatives than those ones listed above. Moreover and as stated by Brian Goetz:

Spliterator is a better Iterator, even without parallelism. (They're also generally just easier to write and harder to get wrong.)

So, is it possible to develop a more readable, simpler, concise and flexible implementation of a query API lazily computed and based on the same principles of the Stream<T> (except the parallel processing part)?

If yes, how can you do it? I would like to see simpler implementations than those ones listed above and if possible taking advantage of new Java 8 features.

Requirements:

  • Do not reuse existing methods from Java 8 API
  • The parallel processing feature is out of the scope of this question.
  • If possible, and better, do not use the Iterable<T> approach.

The reason of my question? I think the best approach to learn a query API such as Stream is trying to implement those same methods by myself. I have already done it successfully when I was learning .net Linq. Of course that I did not achieve a better implementation than Linq but that helped me to understand the internals part. So, I am trying to follow the same approach to learn Stream.

This is not so unusual. There are many workshops following this approach for other technologies, such as the functional-javascript-workshop, which most exercises ask for the implementation of existing methods such a: map(), filter(), reduce(), call(), bind(), etc…

Selected Answer: For now I considered Miguel Gamboa’s answer as my choice, instead of Tagir Valeev’s answer because the latter does not allow the implementaton of findAny() or findFirst() without completely traversing whole elements through the forEach() of dataSrc. However, I think that Tagir Valeev’s answer have other merits regarding the concise implementation of some intermediate operations and also on performance, since the forEach() based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by Brian Goetz on point 2 of its answer

Community
  • 1
  • 1
Gaspium
  • 115
  • 2
  • 7
  • You might study Guava's `FluentIterable`, which is simpler than `Stream` in many ways, and is based on `Iterator` rather than the more complicated `Spliterators` used in `Stream`. – Hank D Apr 27 '16 at 23:02
  • Not at all. It is completely out of my goal to achieve a better solution than Stream. On the other hand my only goal is to understand Stream. My learning approach is to develop a small prototype based on the same principles. That was how I learned .net Linq through IEnumerable and I did the same for Guava with Iterable. And now for Stream it does not seem to me that the best approach is to implement an Iterable, since Stream is based on Sppliterator. Moreover every Iterable based implementation that I found, such as that one pointe in OP is too long. I will clarify my OP. – Gaspium Apr 28 '16 at 16:16
  • 1
    If it is for learning, then why don’t you try it yourself? Implementing a `Spliterator` that will `map` or `filter` each item of another `Spliterator` on-the-fly should be easy. I don’t think that re-implementing an API is the best way to learn it, but if you think so, just go ahead… – Holger Apr 28 '16 at 17:22
  • 1
    @Holger `Spliterator` has 4 abstract methods and it looks even more painful to implement. But [Miguel Gamboa’s answer](http://stackoverflow.com/a/36901940/6263808) proposes a good way to avail the `tryAdvance` and it implements the `map()` and `forEach()` with only one instruction. Regarding my approach, it is nothing new or unusual. Do you know workshops such as the [functional-javascript-workshop](https://github.com/timoxley/functional-javascript-workshop)? Most exercises ask for the implementation of existing methods such a: `map()`, `filter()`, `reduce()`, `call()`, `bind()`, etc… – Gaspium Apr 28 '16 at 21:11
  • 1
    @Gaspium: `AbstractSpliterator` has *exactly one* abstract method, the one that is relevant, `tryAdvance`. Since that’s exactly the same method as in Miguel Gamboa’s answer, there should be no difference in complexity. Except that you don’t need to implement a `forEach` as there’s already `Spliterator.forEachRemaining` which comes for free. – Holger Apr 29 '16 at 08:05
  • 1
    @Holger `AbstractSpliterator` is a class and thus I think that you cannot use the same technique proposed by Miguel Gamboa, which returns only one lambda. – Gaspium Apr 29 '16 at 08:38
  • I am happy with this post unroll and for now we have already achieved 2 good proposals. As a SO user I always seek for new innovative solutions and idioms that enhance source code. On that end, I feel disappointed with the kind of solutions presented in answers listed in OP. Those answers make me feel in 90s. When I am looking for the answer to the question: _How to implement a Stream?_ I am definitely looking for something like Miguel Gamboa and Tagir Valeev posts and I really believe that this kind of answers will help other programmers to realize how Streams are implemented. – Gaspium Apr 29 '16 at 08:39
  • @Holger from your SO history I see that you are an expert programmer and I cannot believe that you prefer the answers listed in OP. I was expecting that you could propose even a better solution than those ones already posted in current answers. – Gaspium Apr 29 '16 at 08:39
  • 1
    @Gaspium: I never said I’m happy with the answer of these other questions, I’d rather dispute the validity of these questions regarding the scope of SO. Similarly, as already said, I think you should try that yourself, if you really want to learn something. – Holger Apr 29 '16 at 08:46
  • @Holger I have already tried by myself and I am not happy with my solutions. That's why I am here. :-) Maybe I could have posted my solutions, but they are not better than the answers listed in OP. So I thought that would be useful to avail some examples already posted in SO, than adding even more useless implementations such as mine solution. – Gaspium Apr 29 '16 at 09:02

3 Answers3

5

It's quite easy to implement the subset of stateless operations without short-circuiting support. You just should care to always stick with internal iteration. The basic building block is forEach operation which can perform given action for every input element. The body of forEach method is the only thing which changes on different stages. So we can either make abstract class with abstract forEach method or accept a function which is actually a body of forEach. I'll stick with second approach:

public final class MyStream<T> {
    private final Consumer<Consumer<T>> action;

    public MyStream(Consumer<Consumer<T>> action) {
        this.action = action;
    }

    public void forEach(Consumer<T> cons) {
        action.accept(cons);
    }
}

Now let's create some simple sources:

public static <T> MyStream<T> of(Iterable<T> elements) {
    // just redirect to Iterable::forEach
    return new MyStream<>(elements::forEach);
}

@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
    return of(Arrays.asList(elements));
}

public static MyStream<Integer> range(int from, int to) {
    return new MyStream<>(cons -> {
        for(int i=from; i<to; i++) cons.accept(i);
    });
}

Now intermediate operations. They just need to adapt a consumer received by action to perform something else:

public <U> MyStream<U> map(Function<T, U> mapper) {
    return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}

public MyStream<T> filter(Predicate<T> pred) {
    return new MyStream<>(cons -> forEach(e -> {
        if(pred.test(e))
            cons.accept(e);
    }));
}

public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
    return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}

public MyStream<T> peek(Consumer<T> action) {
    return new MyStream<>(cons -> forEach(e -> {
        action.accept(e);
        cons.accept(e);
    }));
}

public MyStream<T> skip(long n) {
    return new MyStream<>(cons -> {
        long[] count = {0};
        forEach(e -> {
            if(++count[0] > n)
                cons.accept(e);
        });
    });
}

Now let's create some terminal operations using forEach:

public T reduce(T identity, BinaryOperator<T> op) {
    class Box {
        T val = identity;
    }
    Box b = new Box();
    forEach(e -> b.val = op.apply(b.val, e));
    return b.val;
}

public Optional<T> reduce(BinaryOperator<T> op) {
    class Box {
        boolean isPresent;
        T val;
    }
    Box b = new Box();
    forEach(e -> {
        if(b.isPresent) b.val = op.apply(b.val, e);
        else {
            b.val = e;
            b.isPresent = true;
        }
    });
    return b.isPresent ? Optional.empty() : Optional.of(b.val);
}

public long count() {
    return map(e -> 1L).reduce(0L, Long::sum);
}

public Optional<T> maxBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.maxBy(cmp));
}

public Optional<T> minBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.minBy(cmp));
}

So we have our stream now. Let's try it:

System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
                           .reduce(0, Integer::sum));
// 30

System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
                           .flatMap(x -> MyStream.of(", ", x))
                           .skip(1).reduce("", String::concat));
// a, stream, of, some, strings

System.out.println(MyStream.range(0, 100)
                           .filter(x -> x % 3 == 0).count());
// 34

And so on. Such implementation is very simple, yet pretty close to what's going on in the actual Stream API. Of course when you add short-circuiting, parallel streams, primitive specializations and more stateful operations things will be much more complicated.

Note that unlike Stream API, this MyStream can be reused many times:

MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • can’t you simplify your solution turning `MyStream` into a functional interface with a `forEach` abstract method? Thus, instead of creating a new wrapper instance of `MyStream` you could simply return the lambda on each query method. I think that would be even more efficient because you will remove one further indirection and also takes advantage of the `invokedynamic`. – Gaspium Apr 29 '16 at 08:47
  • 3
    Yes, it's quite possible [see gist](https://gist.github.com/amaembo/39b9c354ba3b63e973f3ce2b3ee4a9bf) and indeed simplifies the example. However this way it's harder to extend, for example, to support characteristics or one-by-one processing (like `tryAdvance`). Thus I would not change the answer, but you can take the gist version if you like. – Tagir Valeev Apr 29 '16 at 09:49
  • 1
    @TagirValeev I merged your gist into branch [foreach-instead-of-tryAdvance](https://github.com/fmcarvalho/quiny/blob/foreach-instead-of-tryAdvance/src/main/java/quiny/Queryable.java). I hope you don’t mind. I will just use it for didactic purposes. Thanks. – Miguel Gamboa May 02 '16 at 10:41
  • @MiguelGamboa, no problems. – Tagir Valeev May 02 '16 at 12:23
  • 2
    @TagirValeev According to your proposal how do you implement a `findFirst()` or `findAny()`? I am not realizing how to do it without causing that the most inner `forEach()` completely traverse the elements source. – Miguel Gamboa May 04 '16 at 17:57
  • 3
    @MiguelGamboa, that would require more changes. The easiest thing is to replace `forEach` with `forEachWithCancel(Predicate)` which would return `false` if processing should be stopped. – Tagir Valeev May 05 '16 at 04:52
1

Using functional style programming and taking advantage of Java 8 default methods we can achieve a short and clean solution of a query API lazily computed. For instance, checkout how you can easily implement map() and forEach() methods in type Queryable bellow and then you can use it like this:

List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
     .map(String::length)
     .forEach(System.out::println);

If you replace the Queryable.of(dataSrc) call with dataSrc.stream()you will get the same result. The following example illustrates an implementation of map() and forEach() methods. Check for the complete solution and a more detailed description at Queryable repository.

UPDATED with @srborlongan comment. Changed forEach signature from forEach(Consumer<T>) to forEach(Consumer<? super T>) and changed of from of(Collection<T>) to of(Iterable<T>)

@FunctionalInterface
public interface Queryable<T>{

  abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance

  static <T> boolean truth(Consumer<T> c, T item){
    c.accept(item);
    return true;
  }

  public static <T> Queryable<T> of(Iterable<T> data) {
    final Iterator<T> dataSrc = data.iterator();
    return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
  }

  public default void forEach(Consumer<? super T> action) {
    while (tryAdvance(action)) { }
  }

  public default <R> Queryable<R> map(Function<T, R> mapper) {
    return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
  }
}
Community
  • 1
  • 1
Miguel Gamboa
  • 8,855
  • 7
  • 47
  • 94
  • Would there be any problems with using Iterable for Queryable::of instead of Collection? I've seen similar code in the past that uses Iterable as input to allow for non-collection sources (I/O streams, random numbers, etc.). – srborlongan Apr 28 '16 at 04:56
  • Also, Stream::forEach actually accepts Consumer super T>, which I presume is the reason why I needed to make Queryable::forEach to accept Consumer super T> to implement Queryable::flatMap. – srborlongan Apr 28 '16 at 05:23
  • 1
    @srborlongan I agree with your suggestions and I included those fixes in `Queryable` example. – Miguel Gamboa Apr 28 '16 at 08:22
  • 1
    1. The OP claims the solution presented in [answer](http://stackoverflow.com/a/30247144/1140754) is too verbose and not the Stream API. 2. My example introduces a new functional interface `Queryable` and takes advantage of default methods to returns a lambda in every query method. 3. It is flexible because `Queryable` methods do not depend on instance fields, such as an auxiliary `predicate` field that is used in `LazySeq`. The `LazySeq` approach requires additional fields for each query method. But maybe I can give a more detailed explanation in my answer. – Miguel Gamboa Apr 28 '16 at 10:31
0

First of all, I have to say I love the design of Lambdas and Stream APIs. The implementation in JDK is also great and high performance. And I'm not sure if your purpose of learning the implementation/doing it by yourself is good or not. But I did implement Stream API in my open source library abacus-common, both sequential and parallel. Here are the source codes at github: Stream. I can't say how good it is, comparing the impelemenation in JDK. But personally, I think implemenation is pretty straight forward and simple. and it's also high performance.

Disclosure: I'm the developer of abacus-common.

user_3380739
  • 1
  • 14
  • 14