1

This is to continue over an earlier post, as part of my task I'm trying to download files from URL using callables, and whenever an exception occurs I'm trying to resubmit the same callable again for maximum number of times.

The problem is, with the current approach my program doesn't terminate after finishing all of the callables in a happy day scenario, it keeps running forever (maybe because I'm using non-daemon threads ? wouldn't it terminate after a given amount of time ?).

Also I believe that the current design will prevent resubmitting the failed callables again, since I'm calling executor.shutdown(), thus whenever a callable fails the executor will prevent adding new callable to the executing queue.

Any ideas how to get over this ?

public class DownloadManager {

int allocatedMemory;
private final int MAX_FAILURES = 5;
private ExecutorService executor;
private CompletionService<Status> completionService;
private HashMap<String, Integer> failuresPerDownload;
private HashMap<Future<Status>, DownloadWorker> URLDownloadFuturevsDownloadWorker;

public DownloadManager() {
    allocatedMemory = 0;
    executor = Executors.newWorkStealingPool();
    completionService = new ExecutorCompletionService<Status>(executor);
    URLDownloadFuturevsDownloadWorker = new HashMap<Future<Status>, DownloadWorker>();
    failuresPerDownload = new HashMap<String, Integer>();
}

public ArrayList<Status> downloadURLs(String[] urls, int memorySize) throws Exception {
    validateURLs(urls);
    for (String url : urls) {
        failuresPerDownload.put(url, 0);
    }
    ArrayList<Status> allDownloadsStatus = new ArrayList<Status>();
    allocatedMemory = memorySize / urls.length;
    for (String url : urls) {
        DownloadWorker URLDownloader = new DownloadWorker(url, allocatedMemory);
        Future<Status> downloadStatusFuture = completionService.submit(URLDownloader);
        URLDownloadFuturevsDownloadWorker.put(downloadStatusFuture, URLDownloader);
    }
    executor.shutdown();
    Future<Status> downloadQueueHead = null;
    while (!executor.isTerminated()) {
        downloadQueueHead = completionService.take();
        try {
            Status downloadStatus = downloadQueueHead.get();
            if (downloadStatus.downloadSucceeded()) {
                allDownloadsStatus.add(downloadStatus);
                System.out.println(downloadStatus);
            } else {
                handleDownloadFailure(allDownloadsStatus, downloadStatus.getUrl());

            }
        } catch (Exception e) {
            String URL = URLDownloadFuturevsDownloadWorker.get(downloadQueueHead).getAssignedURL();
            handleDownloadFailure(allDownloadsStatus, URL);
        }
    }
    return allDownloadsStatus;
}

private void handleDownloadFailure(ArrayList<Status> allDownloadsStatus, String URL) {
    int failuresPerURL = failuresPerDownload.get(URL);
    failuresPerURL++;
    if (failuresPerURL < MAX_FAILURES) {
        failuresPerDownload.put(URL, failuresPerURL);
        // resubmit the same job
        DownloadWorker downloadJob = URLDownloadFuturevsDownloadWorker.get(URL);
        completionService.submit(downloadJob);
    } else {
        Status failedDownloadStatus = new Status(URL, false);
        allDownloadsStatus.add(failedDownloadStatus);
        System.out.println(failedDownloadStatus);
    }
  }                  
}

Update: After I've changed the while loop's condition to a counter instead of !executor.isTerminated() it worked. Why doesn't the executor terminate ?

Community
  • 1
  • 1
a.u.r
  • 1,253
  • 2
  • 21
  • 32

2 Answers2

0

You need to call ExecutorService.shutdown() and awaitTermination() to terminate the threads after all your work is done.

Alternatively, you could provide your own ThreadFactory when constructing your ExecutorService and mark all your threads as daemon so that they won't keep your process alive once the main thread exits.

Daniel Pryden
  • 59,486
  • 16
  • 97
  • 135
0

In ExecutorCompletionService javadoc, we see examples

CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
try {
...
} finally {
         for (Future<Result> f : futures)
             f.cancel(true);
     }

so try to call cancel(true) with all your Future when you need to stop ExecutorCompletionService

Slava Vedenin
  • 58,326
  • 13
  • 40
  • 59