0

I have a use case in which I want to filter out few elements in the list based on a Network call that I perform on the element. To accomplish this I am using streams, filter and Completable Future. The goal is to do async execution so that the operation becomes efficient. The pseudo code for this is mentioned below.

public List<Integer> afterFilteringList(List<Integer> initialList){
   List<Integer> afterFilteringList =initialList.stream().filter(element -> {
        boolean valid = true;
        try{
            valid = makeNetworkCallAndCheck().get();
        } catch (Exception e) {

        }
        return valid;
    }).collect(Collectors.toList());

    return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
 }

The question I am having over here is, Am I doing this operation in an Async way itself?(As I am using 'get' function within the filter will it block the execution and make it sequential only) Or Is there a better way of doing this in Async way using Completable Future and Filters in Java 8.

user2885295
  • 329
  • 1
  • 4
  • 12
  • 1
    You forgot to pass `element` to `makeNetworkCallAndCheck`. Also, it looks weird that the exceptional case is considered “valid”. – Holger Dec 07 '18 at 08:04
  • 1
    @Holger The title mismatches the actual question I believe. *Am I doing this operation in an Async way itself?* ... IMHO the calls are not async and the gets would be blocking...thoughts? – Naman Dec 07 '18 at 08:08
  • 2
    @nullpointer well, calling `get()` immediately destroys the benefit of asynchronous execution, no doubt, but I don’t know, what to suggest as solution, given this code. E.g. the input `List` magically becomes a `List` during the stream operation, to eventually be returned as `List`. I guess, it’s supposed to be the same `Integer` objects all the time, but I don’t want to write code based on assumptions… – Holger Dec 07 '18 at 08:14

3 Answers3

7

When you call get immediately, you are indeed destroying the benefit of asynchronous execution. The solution is to collect all asynchronous jobs first, before joining.

public List<Integer> afterFilteringList(List<Integer> initialList){
    Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
        .collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
    return initialList.stream()
        .filter(element -> jobs.get(element).join())
        .collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}

Of course, the method makeNetworkCallAndCheck has to initiate a truly asynchronous operation as well. Calling a method synchronously and returning a completedFuture is not sufficient. I provided a simple exemplary asynchronous operation here, but for I/O operations, you likely want to provide your own Executor, tailored to the number of simultaneous connections you want to allow.

Holger
  • 285,553
  • 42
  • 434
  • 765
1

If you use get(), it will not be Async

get(): Waits if necessary for this future to complete, and then returns its result.

If you want to process all the request in Async. You can use CompletetableFuture.allOf()

public List<Integer> filterList(List<Integer> initialList){
    List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
    AtomicInteger atomicInteger = new AtomicInteger(0);
    CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
    initialList.forEach(x->{
        completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
            .runAsync(()->{
                if(makeNetworkCallAndCheck(x)){
                    filteredList.add(x);
                }
        });
    });

    CompletableFuture.allOf(completableFutures).join();
    return filteredList;
}

private Boolean makeNetworkCallAndCheck(Integer value){
    // TODO: write the logic;
    return true;
}
Didier L
  • 18,905
  • 10
  • 61
  • 103
0

Collection.parallelStream() is an easy way to do the async stuff for a collection. You can modify your code as the following:

public List<Integer> afterFilteringList(List<Integer> initialList){
    List<Integer> afterFilteringList =initialList
            .parallelStream()
            .filter(this::makeNetworkCallAndCheck)
            .collect(Collectors.toList());

    return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
    return resultOfNetWorkCall(value);
}

You can customize your own executor by this way. And the result order is guaranteed according to this.

I have write the following code to verify my what I said.

public class  DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(50);
        final List<Integer> integers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            integers.add(i);
        }
        long before = System.currentTimeMillis();
        List<Integer> items = forkJoinPool.submit(() ->
                integers
                        .parallelStream()
                        .filter(it -> {
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return true;
                        })
                        .collect(Collectors.toList()))
                .get();
        long after = System.currentTimeMillis();
        System.out.println(after - before);
    }
}

I create my own ForkJoinPool, and it takes me 10019 milliseconds to finish 50 jobs in parallel although each one costs 10000 milliseconds.

Bing Zhao
  • 568
  • 6
  • 19
  • The disadvantage of parallel streams is the lack of control over the executor, as for I/O operations, you usually don’t want the default pool parallelism, which is tailored to the number of CPU cores. – Holger Dec 07 '18 at 12:18
  • @Holger I think so before and even mention that in my post until I see this post, https://stackoverflow.com/a/22269778/5053214 – Bing Zhao Dec 07 '18 at 12:46
  • Well, that’s an undocumented side effect without official support. And it only works for Fork/Join pools, not for arbitrary `Executor` implementations. – Holger Dec 07 '18 at 13:14