4

How can I reuse in java8 (maybe a memoization process) values already computed through iteration over a stream?

If the stream is duplicated or supplied again it will be recomputed. In some cases it would be preferable to trade memory for that cpu time. Collecting everything from the beginning might not be a good idea since the stream is used to find the first item that satisfies a predicate.

Stream<Integer> all = Stream.of(1,2,3,4,5, ...<many other values>... ).
      map(x->veryLongTimeToComputeFunction(x));
System.out.println("fast find of 2"+all.filter(x->x>1).findFirst());

//both of these two lines generate a "java.lang.IllegalStateException: stream has already been operated upon or closed"
System.out.println("no find"+all.filter(x->x>10).findFirst());
System.out.println("find again"+all.filter(x->x>4).findFirst());

The question is simillar to Copy a stream to avoid "stream has already been operated upon or closed" (java 8)

Community
  • 1
  • 1
raisercostin
  • 8,777
  • 5
  • 67
  • 76

5 Answers5

1

Why not to use memoization inside veryLongTimeToComputeFunction? You can put memo cache as a parameter to the func.

a.yekimov
  • 316
  • 1
  • 8
1

The canonical in-memory Stream source is a Collection. A simple, not parallel capable Stream Memoization could be implemented as follows:

public static void main(String[] args) {
    Supplier<Stream<Integer>> s=memoize(
        IntStream.range(0, 10_000)
                 .map(x -> veryLongTimeToComputeFunction(x))
    );
    System.out.println("First item > 1  "+s.get().filter(x -> x>1 ).findFirst());
    System.out.println("First item > 10 "+s.get().filter(x -> x>10).findFirst());
    System.out.println("First item > 4  "+s.get().filter(x -> x>4 ).findFirst());
}
static int veryLongTimeToComputeFunction(int arg) {
    System.out.println("veryLongTimeToComputeFunction("+arg+")");
    return arg;
}

public static <T> Supplier<Stream<T>> memoize(BaseStream<T,?> stream) {
    Spliterator<T> sp=stream.spliterator();
    class S extends Spliterators.AbstractSpliterator<T> {
        ArrayList<T> mem=new ArrayList<>();
        S() { super(sp.estimateSize(), sp.characteristics()); }
        public boolean tryAdvance(Consumer<? super T> action) {
            return sp.tryAdvance(item -> {
                mem.add(item);
                action.accept(item);
            });
        }
    }
    S s=new S();
    return () -> Stream.concat(s.mem.stream(), StreamSupport.stream(s, false));
}

Take care to finish Stream processing before requesting the next Stream from the supplier.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thanks. I'll try it I was hoping to find something in the java collections or at least in guava, apache commons or other generally available, small library. – raisercostin Sep 16 '16 at 10:17
  • Well, you can put this into a small library… – Holger Sep 16 '16 at 10:20
  • @Holger I am not understanding why are you doing this `int ix=mem.size(); if(sp.tryAdvance(mem::add)) { action.accept(mem.get(ix)); return true; }` instead of `sp.tryAdvance(item -> { mem.add(item); action.accept(item); });`? Why are you reading memory (i.e. `mem.get(ix)`) instead of creating an anonymous function (lambda) that receives the item? I believe you have a good reason for doing it but I am not figuring out what is it and I would like to know it. – Miguel Gamboa May 04 '18 at 10:11
  • @MiguelGamboa I suppose, the original idea was that `mem::add` could be stored and reused throughout the entire lifetime of this `Spliterator` while a lambda expression capturing the consumer can not. Since this is rather irrelevant here, I used your suggestion to simplify the code. – Holger May 04 '18 at 11:23
0

I would suggest collecting your Stream to a list and then running your filters on the list's stream.

Joe C
  • 15,324
  • 8
  • 38
  • 50
0

Streams are not meant to be saved, they are about to process data.

Example: you are watching a dvd, in java terms the dvd would be something like a collection, the data transfered from your dvd-player to your tv is a stream. You can not save the stream, however you could burn the cd, in java terms collect it.

There are other options:

  • extract/refactor your stream operations or predicates to a method that gets a stream as parameter and returns a stream
  • use a caching framework: e.g. in Spring methods can be annotated with @Cacheable. First call executes the method, subsequent calls fetches the results from the cache for defined time
  • if you are looking for a non-blocking execution for you long running task, have a look at RxJava
Journeycorner
  • 2,474
  • 3
  • 19
  • 43
  • If java streams are not meant to be saved, what will be a collection that is doing just that? Scala streams are doing this without going to specialized frameworks. – raisercostin Sep 16 '16 at 10:13
  • @raisercostin that is the java approach about streams. In Java streams are only a functional "add-on", Scala was designed to be purely functional. Anyway, since veryLongTimeToComputeFunction(x) is so heavy on the cpu, you will aready gain a lot of performance by adding ".parallel()" to your stream. – Journeycorner Sep 16 '16 at 11:26
0

Java 8 streams are lazy in nature. The operations performed on streams are evaluated in vertical order. What you want to achieve can be achieved using following code:

Stream.of(1,2,3,4,5, ...<many other values>... )
    .map(x-> veryLongTimeToComputeFunction(x))
    .filter(x-> x > 1)
    .findFirst();

This will make sure that the veryLongTimeToComputeFunction() is called only till the matching first element is not found. After that the operation would terminate. In worst case it veryLongTimeToComputeFunction would be called for all numbers if the last number is the one which matches the criteria.

Also you can use parallel streams in conjunction with findAny() method. It will speed up the performance.

Sujit Kamthe
  • 811
  • 11
  • 14