0

I want to run the threads in parallel and handle the exception if any thread fails while running. But, when I am using the future.get method, I am unable to maintain parallel execution. How can this be resolved?

for (int i = 0; i < threads; i++) {
    final Future<Void> future = executor.submit(new ReaderDirectWithPartition_JDBC(outputPath + i, partitionedQuery, propparameters, props));

    try {
        future.get();
    } catch (final CancellationException ce) {
        log.error(ce.getMessage());
        executor.shutdownNow();
    } catch (final ExecutionException ee) {
        log.error(ee.getMessage());
        executor.shutdownNow();
    } catch (final InterruptedException ie) {
        Thread.currentThread().interrupt(); // ignore/reset
        log.error(ie.getMessage());
        executor.shutdownNow();
    }
}
rgettman
  • 176,041
  • 30
  • 275
  • 357
Rockan
  • 141
  • 1
  • 2
  • 9

1 Answers1

1

First submit all your Actions to your thread pool in one loop - collecting the Future elements in a list or array. And call the Future.get() in a separate loop.


final MultiTaskStatus sharedStatus = new MultiTaskStatus();
final List<Future<Void>> pendingsResults = new ArrayList<Future<Void>>(threads);
// submit all the jobs to be executed by the thread pool (executor)
for (int i = 0; i < threads; i++) {
    // avoid submitting more tasks, if one already failed
    if (sharedStatus.isFailed()) {
        break;
    }
    final ReaderDirectWithPartition_JDBC job;
    // hand over the MultiTaskStatus to be set as failed on error
    job = new ReaderDirectWithPartition_JDBC(outputPath + i,
            partitionedQuery, propparameters, props, sharedStatus);
    pendingResults.add(executor.submit(job));
}
if (sharedStatus.isFailed()) {
    // maybe do something special, if we already know something went wrong?
}
try {
    // wait for all scheduled jobs to be done
    for (final Future<Void> pendingJob : pendingResults) {
        pendingJob.get();
    }
} catch (final CancellationException ce) {
    log.error(ce.getMessage());
} catch (final ExecutionException ee) {
    log.error(ee.getMessage());
} catch (final InterruptedException ie) {
    log.error(ie.getMessage());
} finally {
    executor.shutdownNow();
}

with the separate state indicator

class MultiTaskStatus {
    private boolean failed = false;
    public synchronized void setFailed(boolean taskFailed) {
        this.failed = this.failed || taskFailed;
    }
    public synchronized boolean isFailed() {
        return this.failed;
    }
}

Maybe consider using a CompletionService as mentioned in this answer.

Community
  • 1
  • 1
Carsten
  • 2,047
  • 1
  • 21
  • 46
  • Would be good to know why it's being downvoted - it's the same solution as in this answer: http://stackoverflow.com/a/3929512/5127499 – Carsten Jul 30 '15 at 09:08
  • Hi Carsten. Thanks for the solution.But, I want they program to run this way. If I have 10 threads in threadpool executor, I have submitted task 1,2,3,4,5 and before submitting rest of the tasks, thread 3 fails, then I want to stop accepting tasks 6-10 as well as stop all the threads executing and throw exception – Rockan Jul 31 '15 at 03:34
  • How fast are those tasks going to fail? Submitting them to the thread pool will take only milliseconds. If you have a smaller thread pool of only five threads, and one of your first five tasks is failing - the other tasks will never be started. If that still is a problem, consider doing less in the constructor of your ReaderDirectWithPartition_JDBC class, and keep all the relevant action inside the actual execution method. – Carsten Jul 31 '15 at 03:38
  • There are a lot of threads and it might be possible that the thread fails as soon as it is submitted. I have kept only relevan actions inside the actual execution method – Rockan Jul 31 '15 at 03:45
  • I edited my answer to include a shared status object, which will be checked before submitting a new task to the thread pool - which should match your requirements now, right? – Carsten Jul 31 '15 at 03:56