0

I have 2 sets of tasks. I create two ThreadPoolExecutor, given them 2 lists of tasks and calling invokeAll methods in both. I see that after all tasks of first ThreadPoolExecutor are complete tasks of second ThreadPoolExecutor are starting. Is there a way to initiate them in parallel?

public class Test3 {

    public static void main(String[] args) throws Exception {
        
        String[] works1 = {"work1", "work2", "work3", "work4", "work5"};
        String[] works2 = {"work6", "work7", "work8", "work9", "work10", "work11"};
        List<String> workList1 = Arrays.asList(works1);
        List<String> workList2 = Arrays.asList(works2);
        List<Callable<Object>> workerList1 = new ArrayList<Callable<Object>>();
        List<Callable<Object>> workerList2 = new ArrayList<Callable<Object>>();
        ThreadPoolExecutor executorService1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        ThreadPoolExecutor executorService2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        for(int i=0; i<workList1.size(); i++) {
            Runnable worker = new TWorker(workList1.get(i));
            Callable<Object> callableWorker = Executors.callable(worker);
            workerList1.add(callableWorker);
        }
        for(int i=0; i<workList2.size(); i++) {
            Runnable worker = new TWorker(workList2.get(i));
            Callable<Object> callableWorker = Executors.callable(worker);
            workerList2.add(callableWorker);
        }
        System.out.println("Invoke all TP1");
        executorService1.invokeAll(workerList1);
        System.out.println("Invoke all TP2");
        executorService2.invokeAll(workerList2);
        executorService1.shutdown();
        while(!executorService1.isTerminated()) {}
        
        executorService2.shutdown();
        while(!executorService2.isTerminated()) {}
    }
    
}

class TWorker implements Runnable {

    private String work;
    public TWorker(String work) {
        this.work = work;
    }
    @Override
    public void run() {
        System.out.println("Thread : "+Thread.currentThread().getName() +" | work : "+work+" | execution start");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread : "+Thread.currentThread().getName() +" | work : "+work+" | execution end");
    }
}
hnadiger
  • 87
  • 1
  • 1
  • 9
  • 2
    See [this answer](https://stackoverflow.com/a/11872604/2478398) and [this answer](https://stackoverflow.com/a/34798567/2478398) – BeUndead May 16 '21 at 15:30

2 Answers2

2

Your code with slight modification gives what you want to do:

public class Test3 {

    public static void main(String[] args) throws Exception {
        
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                // Simulate a long-running Job
                try {
                    String[] works1 = {"work1", "work2", "work3", "work4", "work5"};
                    List<String> workList1 = Arrays.asList(works1);
                    List<Callable<Object>> workerList1 = new ArrayList<>();
                    ThreadPoolExecutor executorService1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
                    for(int i=0; i<workList1.size(); i++) {
                        Runnable worker = new TWorker(workList1.get(i));
                        Callable<Object> callableWorker = Executors.callable(worker);
                        workerList1.add(callableWorker);
                    }
                    System.out.println("Invoke all TP1");
                    executorService1.invokeAll(workerList1);
                    executorService1.shutdown();
                    while(!executorService1.isTerminated()) {}

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                System.out.println("I'll run in a separate thread than the main thread.");
            }
        });

        // Block and wait for the future to complete
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                // Simulate a long-running Job
                try {
                    String[] works2 = {"work6", "work7", "work8", "work9", "work10", "work11"};
                    List<String> workList2 = Arrays.asList(works2);
                    List<Callable<Object>> workerList2 = new ArrayList<Callable<Object>>();
                    ThreadPoolExecutor executorService2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
                    for(int i=0; i<workList2.size(); i++) {
                        Runnable worker = new TWorker(workList2.get(i));
                        Callable<Object> callableWorker = Executors.callable(worker);
                        workerList2.add(callableWorker);
                    }
                    System.out.println("Invoke all TP2");
                    executorService2.invokeAll(workerList2);
                    executorService2.shutdown();
                    while(!executorService2.isTerminated()) {}

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                System.out.println("I'll run in a separate thread than the main thread.");
            }
        });

        // Block and wait for the future to complete
        future1.get();
        future2.get();
    }
}
BeUndead
  • 3,463
  • 2
  • 17
  • 21
pasame
  • 41
  • 3
2

Is there a way to initiate them in parallel?

Sure. Instead of adding them to lists just submit them to the thread-pool directly.

Executor executorService1 = Executors.newFixedThreadPool(3);
Executor executorService2 = Executors.newFixedThreadPool(3);
for (int i = 0; i < works1.length; i++) {
    executorService1.submit(new TWorker(works1[i]));
}
// once all jobs have been submitted we can shutdown the pool
executorService1.shutdown();
for(int i = 0; i < works2.length; i++) {
    executorService2.submit(new TWorker(works2[i]));
}
// once all jobs have been submitted we can shutdown the pool
executorService2.shutdown();
executorService1.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
executorService2.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

The jobs start running in parallel as soon as you submit them. You shutdown the pools after the last job has been submitted but the submitted jobs will continue to run in the background. Lastly, instead of spinning in a tight !executorService1.isTerminated() loop (never a good idea), just use awaitTermination(...).

Gray
  • 115,027
  • 24
  • 293
  • 354