4

I would like to create a Stream from a JPA Repository. The goal is to map the Entities coming from the Repo (which could be more than a million) to other Entities which will in turn be stored in another Repo.

So far I built a Collector which will collect a given number (e.g. 1000) Entities and then store them into the target Repo. This will work in a parallel Stream. What I need now is a good way to fetch Entities from the source Repo and feed them into the Stream as they are needed.

Most promising so far was to implement a Supplier (http://docs.oracle.com/javase/8/docs/api/java/util/function/Supplier.html) to build the Stream via generate but I did not find a way to terminate the process when the query to the source Repo does not provide another Entity.

Any pointers?

Sean Van Gorder
  • 3,393
  • 26
  • 26
Christoph Grimmer
  • 4,210
  • 4
  • 40
  • 64
  • possible duplicate of [How to create a Java 8 Stream from an iterator?](http://stackoverflow.com/questions/23439780/how-to-create-a-java-8-stream-from-an-iterator) – a better oliver Sep 18 '14 at 07:25
  • Suppliers are only for infinite streams. You can write a plain old iterator. – a better oliver Sep 18 '14 at 07:26
  • Spliterator was my second guess. I was hoping for a more straight forward solution. Problem still is that I do not want to fetch the complete collection of Entities and simply using the Repository as Iterator will do no good. – Christoph Grimmer Sep 18 '14 at 08:07
  • You can load the entities in batches. – a better oliver Sep 18 '14 at 08:14
  • I know that I can do pagination. What I want is to fetch the Entities from the Source Repository as needed, and feed them in the parallel Stream where they are mapped, accumulated and stored into a target repository. – Christoph Grimmer Sep 18 '14 at 08:55
  • Maybe I'm missing something or there's a misunderstaning, but I don't understand why pagination doesn't meet your requirements. Loading 20 entities at once or one after another doesn't make much difference. The former would be even more efficient, imho, and **does** meet the requirement _"as needed"_. – a better oliver Sep 18 '14 at 09:30
  • Maybe I should be more pragmatic about this. I was hoping for a sleek solution that would integrate more seamless with the whole stream idea. Repeatedly dropping in Chunks of Entities somehow does not nicely integrate with the Stream concept. But I guess I will have to make do with it :-( – Christoph Grimmer Sep 18 '14 at 10:23
  • Why do you think that it does not integrate nicely with stream concept? Whether the iterator reads one entity after another, batches of 20 or all at once doesn't make a difference to the stream or for your stream related code. For what it's worth: You could also load all the ids at first and then read one entity after another. – a better oliver Sep 18 '14 at 10:53

4 Answers4

3

We recently added support for this in Spring Data JPA (and MongoDB) in our latest RC1 Release of the Spring Data Fowler Release Train.

Example for Stream returned by a delegating default method Example for "real streaming" Stream

Thomas Darimont
  • 1,356
  • 11
  • 14
  • Hi Thomas, big thanks for that. I actually blogged about how meeting you in person got this going (in german): http://blog.flavia-it.de/trefft-euch/ – Christoph Grimmer Mar 12 '15 at 09:26
2

If you are able to express the source as Supplier implementation you may also be able to implement a Spliterator. Instead of Supplier.get you would implement boolean tryAdvance(Consumer) which will not return the new value but invoke accept on the Consumer, if there is a new item or return false otherwise. For most cases this simplifies the implementation compared to an Iterator where you have to deal with the two methods hasNext and next which could be invoked in arbitrary order.

You have to implement a few more methods for Spliterator but thankfully there are straight-forward ways for implementating them.

public Spliterator<T> trySplit() {
    return null;// simple answer when splitting is not supported
}
public long estimateSize() {
    return Long.MAX_VALUE; // the value which should be used for UNKNOWN
}
public int characteristics() {
    return 0; // no flags but check out whether some flags fit
}

For the characteristics method it’s worth looking for the possible values as they might improve the stream processing if they fit to your source characteristics.

Once you have your Spliterator you can create a stream out of it:

Stream<T> s=StreamSupport.stream(sp, false);

If your source fits more into the hasNext/next pattern, you may implement an ordinary Iterator and let the JRE create a Spliterator like described in “How to create a Java 8 Stream from an iterator?”

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
0

A simple example might be:

  @Repository
  public class MyEntityRepository extends CrudRepository<MyEntity, Long> {           
  }

  @Component
  public class MyEntityService {

       @Autowired
       private MyEntityRepository myEntityRepository;


       public void() {
            // if the findAll() method returns List
            Stream<MyEntity> streamFromList = myEntityRepository.findAll().stream();


            // if the findAll() method returns Iterable
            Stream<MyEntity> streamFromIterable = StreamSupport.stream(myEntityRepository.findAll().spliterator(), true);

       }
  } 
Ricardo Veguilla
  • 3,107
  • 1
  • 18
  • 17
0

Ok, thanks for all the contributions. I combined what was said and implemented what I need. Maybe the implementation will clarify what I wanted to start with.

I created two classes, RepositryCollector and RepositorySpliterator.

public class RepositoryCollector<T> implements Collector<T, Tuple2<Integer,List<T>>, Integer>{

    private JpaRepository<T, ?> repository;
    private int threshold;

    public BinaryOperator<Tuple2<Integer, List<T>>> combiner() {
        return (listTuple, itemsTuple) -> {
            List<T> list = listTuple._2;
            List<T> items = itemsTuple._2;
            list.addAll(items);
            int sum = listTuple._1 + itemsTuple._1;
            if(list.size() >= this.threshold){
                this.repository.save(list);
                this.repository.flush();
                list = new LinkedList<>();
            }
            return new Tuple2<>(sum, list);
        };
    }
}

I omit the other function which are required for a Collector since all relevant information is present in the combiner. Same holds for the Spliterator.

public class RepositorySpliterator<T> implements Spliterator<T> {

    private Slice<T> slice;
    private Function<Pageable, Slice<T>> getSlice;
    private Iterator<T> sliceIterator;

    public RepositorySpliterator(Pageable pageable, Function<Pageable, Slice<T>> getSlice) {
        this.getSlice = getSlice;
        this.slice = this.getSlice.apply(pageable);
        this.sliceIterator = slice.iterator();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(sliceIterator.hasNext()) {
            action.accept(sliceIterator.next());
            return true;
        } else if (slice.hasNext()) {
            this.slice = getSlice.apply(slice.nextPageable());
            this.sliceIterator = this.slice.iterator();
            if(sliceIterator.hasNext()){
                action.accept(sliceIterator.next());
                return true;
            }
        }
        return false;
    }

    public Stream<T> getStream(boolean parallel){
        return StreamSupport.stream(this, parallel);
    }
}

As you can see I put in a helper function to generate the Stream I need. Maybe this is a tad sloppy but... meh.

So now I just need a few lines of code in my mapping classes to get things going.

    public void start(Timestamp startTimestamp, Timestamp endTimestamp) {
        new RepositorySpliterator<>(
                new PageRequest(0, 10000), pageable -> sourceRepository.findAllBetween(startTimestamp, endTimestamp, pageable))
                .getStream(true)
                .map(entity -> mapToTarget(endTimestamp, entity))
                .collect(new RepositoryCollector<>(targetRepository, 1000));
    }

The mapper will fetch 10000 Entities from the source, will pour them into the stream pool so they can be mapped and stored. Whenever one of the streams runs out of new Entities, a fresh batch will be fetched and fed into the same stream pool.

If there are glaring errors in my implementation feel free to comment and improve!

Christoph Grimmer
  • 4,210
  • 4
  • 40
  • 64