1
//Unit of logic I want to make it to run in parallel
public PagesDTO convertOCRStreamToDTO(String pageId, Integer pageSequence) throws Exception {
    LOG.info("Get OCR begin for pageId [{}] thread name {}",pageId, Thread.currentThread().getName());
    OcrContent ocrContent = getOcrContent(pageId);
    OcrDTO ocrData = populateOCRData(ocrContent.getInputStream());
    PagesDTO pageDTO = new PagesDTO(pageId, pageSequence.toString(), ocrData);
    return pageDTO; 
}

Logic to execute convertOCRStreamToDTO(..) in parallel then collect its results when individuals thread execution is done

List<PagesDTO> pageDTOList = new ArrayList<>();
//javadoc: Creates a work-stealing thread pool using all available processors as its target parallelism level.
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(); 
Instant start = Instant.now();
List<CompletableFuture<PagesDTO>> pendingTasks = new ArrayList<>();
List<CompletableFuture<PagesDTO>> completedTasks = new ArrayList<>();
CompletableFuture<<PagesDTO>> task = null;

for (InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    String pageId = dcInputPageDTO.getPageId();
    task = CompletableFuture
            .supplyAsync(() -> {
                try {
                    return convertOCRStreamToDTO(pageId, pageSequence.getAndIncrement());
                } catch (HttpHostConnectException | ConnectTimeoutException e) {
                    LOG.error("Error connecting to Redis for pageId [{}]", pageId, e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.REDIS_CONNECTION_FAILURE),
                            " Connecting to the Redis failed while getting OCR for pageId ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                } catch (CaptureException e) {
                    LOG.error("Error in Document Classification Engine Service while getting OCR for pageId [{}]",pageId,e);
                    exceptionMap.put(pageId,e);
                } catch (Exception e) {
                    LOG.error("Error getting OCR content for the pageId [{}]", pageId,e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.TECHNICAL_FAILURE),
                            "Error while getting ocr content for pageId : ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                }
                return null;
            }, newWorkStealingPool);
    //collect all async tasks
    pendingTasks.add(task);
}

//TODO: How to avoid unnecessary loops which is happening here just for the sake of waiting for the future tasks to complete???
//TODO: Looking for the best solutions
while(pendingTasks.size() > 0) {
    for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
        if(futureTask != null && futureTask.isDone()){
            completedTasks.add(futureTask);
            pageDTOList.add(futureTask.get());
        }
    }
    pendingTasks.removeAll(completedTasks);
}

//Throw the exception cought while getting converting OCR stream to DTO - for any of the pageId
for(InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    if(exceptionMap.containsKey(dcInputPageDTO.getPageId())) {
        CaptureException e = exceptionMap.get(dcInputPageDTO.getPageId());
        throw e;
    }
}

LOG.info("Parallel processing time taken for {} pages = {}", dcReqDTO.getPages().size(),
        org.springframework.util.StringUtils.deleteAny(Duration.between(Instant.now(), start).toString().toLowerCase(), "pt-"));

Please look at my above code base todo items, I have below two concerns for which I am looking for advice over stackoverflow:

1) I want to avoid unnecessary looping (happening in while loop above), what is the best way for optimistically I wait for all threads to complete its async execution then collect my results out of it??? Please anybody has an advice?

2) ExecutorService instance is created at my service bean class level, thinking that, it will be re-used for every requests, instead create it local to the method, and shutdown in finally. Am I doing right here?? or any correction in my thought process?

ravibeli
  • 484
  • 9
  • 30
  • What's the point of returning a `Supplier` in `convertOCRStreamToDTO()`? – Didier L Apr 10 '17 at 09:43
  • @didier-l Yep, I have no point returning `Supplier` in `convertOCRStreamToDTO()`, I updated my sample code above, please suggest how can I avoid unnecessary looping at while loop?? Any better way to collect the result as soon as thread completes its execution?? – ravibeli Apr 10 '17 at 11:50
  • Concerning the question about the `ExecutorService`, that would be better asked as a separate question – which is likely to be already answered somewhere here on SO so please search first to avoid creating a duplicate question. – Didier L Apr 10 '17 at 13:04

1 Answers1

2

Simply remove the while and the if and you are good:

for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
    completedTasks.add(futureTask);
    pageDTOList.add(futureTask.get());
}

get() (as well as join()) will wait for the future to complete before returning a value. Also, there is no need to test for null since your list will never contain any.

You should however probably change the way you handle exceptions. CompletableFuture has a specific mechanism for handling them and rethrowing them when calling get()/join(). You might simply want to wrap your checked exceptions in CompletionException.

Didier L
  • 18,905
  • 10
  • 61
  • 103
  • you are the man! Awesome. It solved my doubt. Exception handling this way `futureTask.completeExceptionally(e1)` I am exploring more to handle it. You solved my concern primarily I had. – ravibeli Apr 10 '17 at 17:25
  • @ravibeli: [What does it mean when an answer is "accepted"?](https://stackoverflow.com/help/accepted-answer) – Holger Apr 13 '17 at 15:34
  • @didier-l For #2 I searched in SO nobody asked the same question, I am waiting for answer to my question #2 – ravibeli Apr 15 '17 at 02:00
  • @ravibeli [Should I create executorService within a request or share one instance across the webapp?](http://stackoverflow.com/questions/42884255/should-i-create-executorservice-within-a-request-or-share-one-instance-across-th) – other similar questions exist, [Google is your friend](https://www.google.be/search?hl=fr&ie=UTF-8&oe=UTF-8&q=java+8+create+executor+service+in+each+request&gws_rd=cr&ei=ohzyWNzwB8fdwAK0nLSoBQ#hl=fr&q=java+create+executor+service+in+each+request) :-) – Didier L Apr 15 '17 at 13:19
  • :-) Ok I got it. I am using Dropwizard framework with spring integration, I have created executor service instance at context level to re-use the same instance, which it right way to do as the point I read in the link you provided. – ravibeli Apr 16 '17 at 15:14