5

Suppose I have a Stream<Callable<SomeClass>> stream;. The stream is accessing over a million objects which will not fit in memory.

What is the idiomatic way to convert this to a Stream<SomeClass> in a manner that ensures the Callable::call are executed in parallel before being delivered to a consumer that is non-threaded-safe (perhaps by calling .sequential().forEach() or some other bottlenecking mechanism)?

i.e. Process the stream in parallel but deliver the output sequentially (random order ok, as long as it's single-threaded).

I know I could do what I want by setting up an ExecutionService and a Queue between the original stream and the consumer. But that seems like a lot of code, is there a magic one-liner?

Alex R
  • 11,364
  • 15
  • 100
  • 180
  • 1
    When it comes to `.parallel()` and `.sequential()`, the last call decides for the _whole stream operation_ how it works. You can't have some operations be parallel and some not. But frankly I'd expect `.parallel().map(Callable::call)` to work just fine and do the right thing. – Louis Wasserman Dec 31 '16 at 23:59
  • Basically I wanted `parallel().map(Callable::call).sequential().forEach()` to work, but as you point out it doesn't. – Alex R Jan 01 '17 at 00:06
  • 6
    You should just use forEachOrdered in that case. – Louis Wasserman Jan 01 '17 at 00:06
  • 2
    @LouisWasserman Won’t `map(Callable::call)` fail to compile since that method is declared to throw Exception? – VGR Jan 01 '17 at 15:36
  • 2
    Related: [forEach vs forEachOrdered in Java 8 Stream](http://stackoverflow.com/questions/32797579/foreach-vs-foreachordered-in-java-8-stream) – Didier L Jan 02 '17 at 16:31
  • 2
    In this case "executed sequentially" is not equal to "executed single-threaded". forEachOrdered will work along with a parrallel stream while still putting out the elements subsequently in the end, but not in the sense of single-threaded - it ensures a happens-before relationship between the actions on the elements which can still happen to be executed by different threads; discussed [here](http://stackoverflow.com/a/34418845). – Calculator Jan 03 '17 at 01:26
  • @Calculator, yes, you've caught on to a nuance that makes this question harder then it may seem at first. – Alex R Jan 03 '17 at 02:27
  • @AlexR If `forEachOrdered` is not what you are looking for, you probably have to wrap your streams `Spliterator` using a custom `Spliterator`. This is as far as I know the most flexible way to process stream elements lazily. I edited my answer coming up with such a custom `Spliterator` which I hope achieves what you want (at least in principle). It can be even used to compose a "magic one-liner". – Calculator Jan 03 '17 at 14:26

5 Answers5

1

You could still employ an ExecutorService for parallelization. Like this:

ExecutorService service = Executors.newFixedThreadPool(4);
stream.map(c -> service.submit(c)).map(future -> {
    try {               
        return future.get(); //retrieve callable result
    } catch (InterruptedException | ExecutionException ex) {        
        //Exception handling    
        throw new RuntimeException(ex);         
    }
});

You can process the resulting Stream<SomeClass> further sequentially.

If you use forEach/forEachOrdered directly on the Stream<Future<SomeClass>> you can process a resulting SomeClass-object directly once the current future is done (different from when you use invokeAll() which blocks until every task is done).

If you want to process the results of the callables in the exact order they are available you will have to use CompletionService which can't be used along with a single chain of stream operations due to the necessary call of Future<SomeClass> f = completionService.take() after submitting the callables.

EDIT:

Using an ExecutorService within streams doesn't work the way I showed above, because every Callable is submitted and requested via future.get() one after the other.

I found a possible even side-effect heavier solution dividing the Callables in fixed parallelized chunks.

I use a class TaskMapper as mapping-function for submitting the Callables and mapping them to chunks:

class TaskMapper implements Function<Callable<Integer>, List<Future<Integer>>>{
    private final ExecutorService service;
    private final int chunkSize;
    private List<Future<Integer>> chunk = new ArrayList<>(); 

    TaskMapper(ExecutorService service, int chunkSize){
        this.service = service;
        this.chunkSize = chunkSize;
    }

    @Override
    public List<Future<Integer>> apply(Callable<Integer> c) {
        chunk.add(service.submit(c));
        if(chunk.size() == chunkSize){
            List<Future<Integer>> fList = chunk;
            chunk = new ArrayList<>();              
            return fList;
        }else{
            return null;
        }
    }

    List<Future<Integer>> getChunk(){
        return chunk;
    }
}

This how the chain of stream-operations looks like:

ExecutorService service = Executors.newFixedThreadPool(4);
TaskMapper taskMapper = new TaskMapper(service, 4);
stream.map(taskMapper)
    .filter(fl -> fl != null) //filter for the chunks
    .flatMap(fl -> fl.stream()) //flat-map the chunks to futures
    .map(future -> {
        try {               
            return future.get();
        } catch (InterruptedException | ExecutionException ex) {    
            throw new RuntimeException(ex);
        }
    });  
//process the remaining futures  
for(Future<Integer> f : taskMapper.getChunk()){
    try {               
        Integer i = f.get();
        //process i
    } catch (InterruptedException | ExecutionException ex) {    
        //exception handling
    }
}

This works as follows: The TaskMapper takes 4 callables each time submits them to the service and maps them to a chunk of futures (without Spliterator). This is solved by mapping to null for the 1st, 2nd and 3rd callable each time. null could be replaced by a dummy object for example. The mapping function that maps the futures to the results waits for the result of each future of the chunk. I use Integer in my example instead of SomeClass. When all results of the futures in the current chunk are mapped, a new chunk will be created and parallelized. Finally, if the number of elements in the stream is not dividable by the chunkSize(4 in my example), the remaining futures will have to be retrieved from the TaskMapper and processed outside of the stream.

This construct works for the tests I carried out, but I am aware that it is possible fragile due to the side-effects, statefullness and the undefined evaluation behavior of the stream.

EDIT2:

I made a version of the construct from the previous EDIT using a custom Spliterator:

public class ExecutorServiceSpliterator<T> extends AbstractSpliterator<Future<T>>{
    private final Spliterator<? extends Callable<T>> srcSpliterator;
    private final ExecutorService service;
    private final int chunkSize;
    private final Queue<Future<T>> futures = new LinkedList<>();

    private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator) {
        this(srcSpliterator, Executors.newFixedThreadPool(8), 30); //default
    }

    private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator, ExecutorService service, int chunkSize) {
        super(Long.MAX_VALUE, srcSpliterator.characteristics() & ~SIZED & ~CONCURRENT);
        this.srcSpliterator = srcSpliterator;
        this.service = service;
        this.chunkSize = chunkSize;
    }

    public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream){
        return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator()));
    }

    public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream, ExecutorService service, int chunkSize){
        return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator(), service, chunkSize));
    }

    private static <T> Stream<T> getStream(ExecutorServiceSpliterator<T> serviceSpliterator){
        return StreamSupport.stream(serviceSpliterator, false)
            .map(future -> {
                try {               
                    return future.get();
                } catch (InterruptedException | ExecutionException ex) {    
                    throw new RuntimeException(ex);
                }
            }
        );
    }

    @Override
    public boolean tryAdvance(Consumer<? super Future<T>> action) {
        boolean didAdvance = true;
        while((didAdvance = srcSpliterator.tryAdvance(c -> futures.add(service.submit(c))))
                && futures.size() < chunkSize);
        if(!didAdvance){
            service.shutdown();
        }

        if(!futures.isEmpty()){
            Future<T> future = futures.remove();
            action.accept(future);
            return true; 
        }
        return false;
    }           

}

This class provides functions (pipeParallelized()) which take a stream of Callable-elements execute them chunk-wise in parallel and then ouput a sequential stream containing the results. Spliterators are allowed to be stateful. Therefore this version should hopefully not violate any stream operation constraints. This is how the Splitterator can be used (close to a "magic oneliner"):

ExecutorServiceSpliterator.pipeParallelized(stream);

This line takes the stream of Callables stream parallelizes the execution of them and returns a sequential stream containing the results (piping happens lazily -> should work with millions of callables) which can be processed further with regular stream operations.

The implementation of ExecutorServiceSpliteratoris very basic. It should mainly demonstrate how it could be done in principle. The resupplying of the service and the retrieving of the results could be optimized. For example if the resulting stream is allowed to be unordered, a CompletionService could be used.

Calculator
  • 2,769
  • 1
  • 13
  • 18
  • The correct course of action in the `catch` block is to wrap the caught exception in a runtime exception, not to pollute the Stream with nulls. – VGR Jan 01 '17 at 15:35
  • If the stream is processed sequentially, it will map each callable one by one to a future, then wait immediately on the just-created future and process its result before processing the next callable. There will be no parallel processing with this solution. – Didier L Jan 02 '17 at 16:20
  • @DidierL Thank you for pointing this out. I reworked the solution so that it features actual parallel processing. It seems to work as expected, but it relies on some conditions that may not be guaranteed to be fulfilled by the stream implementation. – Calculator Jan 02 '17 at 19:40
0

You are asking for an idiomatic solution. Streams with sideeffects in its behavioral parameters are discouraged (explicitly stated in the javadoc of Stream).

So the idiomatic solution is basically ExecutorService + Futures and some loops/forEach(). If you have a Stream as parameter, just transform it to a List with the standard Collector.

Something like that:

    ExecutorService service = Executors.newFixedThreadPool(5);
    service.invokeAll(callables).forEach( doSomething );
    // or just
    return service.invokeAll(callables);
k5_
  • 5,450
  • 2
  • 19
  • 27
  • Edit: The stream is accessing over a million objects which will not fit in memory. So I can't convert the stream to a list for `invokeAll`. – Alex R Jan 01 '17 at 01:08
  • Actually the real problem with this solution is that the forEach() executes in parallel, and my `doSomething` must be single-threaded. – Alex R Apr 27 '18 at 23:22
0

First Example:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    () -> "job1", 
    () -> "job2",  
    () -> "job3");

executor.invokeAll(callables).stream().map(future -> {
    return future.get();
}).forEach(System.out::println);

Second Example:

Stream.of("1", "2", "3", "4", "", "5")
      .filter(s->s.length() > 0)
      .parallel()
      .forEachOrdered(System.out::println);
fabfas
  • 2,200
  • 1
  • 21
  • 21
  • Edit: The stream is accessing over a million objects which will not fit in memory. So I can't convert the stream to a list for `invokeAll`. – Alex R Jan 01 '17 at 01:08
  • Will this be helpful? Stream.of("1", "2", "3", "4", "", "5").filter(s->s.length() > 0).parallel().forEachOrdered(System.out::println); – fabfas Jan 01 '17 at 03:32
0
    public static void main(String[] args) {
            testInfititeCallableStream();
        }
        private static void testInfititeCallableStream() {
            ExecutorService service = Executors.newFixedThreadPool(100);
            Consumer<Future<String>> consumeResult = (Future<String> future)->{
                try {
                    System.out.println(future.get());
                } catch (InterruptedException | ExecutionException  e) {
                    e.printStackTrace();
                } 
            };
        getCallableStream().parallel().map(callable -> service.submit(callable)).forEach(consumeResult);   

        }
    private static Stream<Callable<String>> getCallableStream() {
            Random randomWait = new Random();
            return Stream.<Callable<String>>generate(() -> 
new Callable<String>() {
                public String call() throws Exception {
                    //wait for testing
                    long time = System.currentTimeMillis();
                    TimeUnit.MILLISECONDS.sleep(randomWait.nextInt(5000));
                    return time + ":" +UUID.randomUUID().toString();
                };
            }).limit(Integer.MAX_VALUE);
        }
Chota Bheem
  • 1,106
  • 1
  • 13
  • 31
0

None of the other answers worked for me.

I finally settled on something like this (pseudo-code):

ExecutorService executor = Executors.newWorkStealingPool();
CompletionService completor = new CompletionService(executor);
int count = stream.map(completor::submit).count();
while(count-- > 0) {
  SomeClass obj = completor.take();
  consume(obj);
}

The consume(obj) loop is executed sequentially in a single thread while the individual callable tasks asynchronously work their way through the CompletionService's multiple threads. Memory consumption is limited as the CompletionService will have only as many items in progress at a time as there are threads available. The Callables waiting for execution are eagerly materialized from the stream, but the impact of that is negligible compared to the memory each consumes once it starts executing (your use-case may vary).

Alex R
  • 11,364
  • 15
  • 100
  • 180