I have an ExecutorService that returns a List defined as List<Callable> callables=Collections.synchronizedList(new ArrayList<Callable>());
I made it a synchronizedList when I first had the issue because I thought it would be thread safe, but the issue persists.
The problem I'm having is that the following Code is throwing a ConcurrentModificationException at Future<Object> next = i.next();
below. Its worth noting that it makes it partially through the List before bombing out.
ExecutorWorker snapshotExecutorWorker=new ExecutorWorker(this);
Iterator<Future<Object>> i= futures.iterator();
while(i.hasNext()){
Future<Object> next = i.next();//List<Callable>
try {
Object get = next.get();
Class<? extends Object> aClass = get.getClass();
System.out.println("Class= "+aClass);
List<Callable> l= (List)get;
System.out.println("L.size= "+l.size());
for(int a=0;a<l.size();a++){
snapshotExecutorWorker.addTask(l.get(a));
}
} catch (InterruptedException ex) {
Logger.getLogger(GUI.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(GUI.class.getName()).log(Level.SEVERE, null, ex);
}
}
ExecutorWorker is basically a SwingWorker that monitors the status of an ExecutorCompletionService.
public class ExecutorWorker extends SwingWorker<List<Future>, Object> implements ExecutorInterface {
List<Future> results = new ArrayList();
ExecutorService executorService = Executors.newFixedThreadPool(100);
ExecutorCompletionService<Object> ecs = new ExecutorCompletionService<>(executorService);
List<Future<Object>> jobs = new ArrayList();
ProgressMonitor progressMonitor;
boolean isExecuting = true;
Monitor monitor;
public ExecutorWorker(Monitor f) {
monitor = f;
}
public void addMonitor(Monitor f) {
monitor = f;
}
/**
*This method adds Callables to the Executor.
* @param r
* @return
*/
@Override
public Future<?> addTask(Callable r) {
Future futureValue = ecs.submit(r);
monitor.addFuture(futureValue);
System.out.println("Callable added in [ExecutorWorker]");
jobs.add(futureValue);
monitor.tasksAdded(true);
return futureValue;
}
/**
*This method returns a List containing the Future results. Use Future.get() to retrieve.
* @return
*/
public List<Future> getResults() {
return results;
}
@Override
protected void done() {
}
@Override
protected List<Future> doInBackground() throws ExecutionException, InterruptedException {
// System.out.println("Executor: In Do In BackGround");
// System.out.println("Jobs.size= " + jobs.size());
for (int i = 0; i < this.jobs.size(); i++) {
Future<Object> take = ecs.take();
results.add(take);
monitor.tasksCompleted(true);
int v = (int) ((monitor.getCompletedTasks() / this.monitor.getTotalTasks()) * 100);
this.setProgress(v);
String message = String.format("Processing " + (i + 1) + " of " + jobs.size() + " Jobs. %d%%.\n", v);
System.out.println("Message= " + message);
progressMonitor.setNote(message);
}
return results;
}
public void setProgressMonitor(ProgressMonitor progressMonitor) {
this.progressMonitor = progressMonitor;
}
}