3

Is there any way to go through a huge database and apply some jobs in parallel for bench of entries? I tried with ExecutorService, but we have to shutdown() in order to know the pool size...

So my best solution is:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 100;
    int MYTHREADS = 10;
    int loopIndex = 0;
    boolean bContinue=true;
    Runnable worker;



    while(bContinue) // in this loop we'll constantly fill the pool list
    {
        loopIndex++;
        ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

        List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
        for(String id: ids) {
            worker = new MyRunnable(id);
            executor.execute(worker);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                    " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
            );
            TimeUnit.MILLISECONDS.sleep(500);
        }

        if(loopIndex>=3) {
            System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
            bContinue = false;
        }
        dbOffset+=nbOfArticlesPerRequest;
    }
}



public static class MyRunnable implements Runnable {

    private final String id;

    MyRunnable(String id) {
        this.id = id;
    }

        @Override
        public void run()
        {
            System.out.println("Thread '"+id+"' started");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread '"+id+"' stopped");
        }
    }
}

This is working fine, but the drawback is that at every end of loop I need to wait the last threads to finish.

e.g.: when only 3 threads are running...

I did the following in order to solve this problem, but is that "safe"/correct?

BTW: is there any way to know how many tasks/threads are in the queue?

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executor.execute(worker);
             }

            while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
                System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
                );
                TimeUnit.MILLISECONDS.sleep(500);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executor.shutdown();
    // Wait until all threads are finish
    while (!executor.isTerminated()) {
        System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

EDIT:

I try to launch 1 or 10 millions tasks, so (I assume) I cannot put them all in the queue... That's why I use a global executor in order to be able to always have some threads in queue (for that I cannot shutdown the executor, otherwise it's not usable anymore).

Latest code version:

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executorPool.execute(worker);
             }

            while (executorPool.getActiveCount() >= MYTHREADS  || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
            {
                System.out.println("Pool size is now " + executorPool.getActiveCount()+
                                        " - queue size: "+ executorPool.getQueue().size()
                );

                if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
                    System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
                    break;
                }

                TimeUnit.MILLISECONDS.sleep(2000);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executorPool.shutdown();
    // Wait until all threads are finish
    while (!executorPool.isTerminated()) {
        System.out.println("Pool size is now " + executorPool.getActiveCount()+
                " - queue size: "+ executorPool.getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

Thanks in advance

Bast
  • 661
  • 2
  • 7
  • 23
  • You can use invokeAll() to wait for completion of threads. Refer to : https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/36699136#36699136 – Ravindra babu Jun 29 '17 at 16:40

3 Answers3

8

Update

Now it's clear to me that your main concern it that you can't submit 10 million tasks at once.

Do not be afraid, you can submit all of them into executor. The actual amount of tasks run in parallel is limited by the underlying thread pool size. That is, if you have pool size of 2, only two tasks are being executed at the time, the rest sit in the queue and wait for the free thread.

By default Executors.newFixedThreadPool() creates the Executor that has a queue of Integer.MAX_VALUE size, hence your millions of tasks would fit there.


You can use ExecutorService.submit() method that returns Future. Then you can examine state of your Future tasks (i.e. with isDone(), isCancelled() methods).

Executor is typically something you don't want to shutdown explicitly and exists throughout your application lifecycle. With this approach you don't need to shutdown in order to know how many tasks are pending.

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

Moreover, please note that tasks and threads are not interchangeable terms. In your case, the executor has fixed number of threads. You can submit more tasks than that, but the rest will sit in executor's queue until there's a free thread to run the task on.

David Siro
  • 1,826
  • 14
  • 33
  • Might be a good idea... Then I just need to add a "waiting loop" in order to assign more tasks when less than X tasks are running... – Bast Jul 11 '17 at 06:28
  • the same question as for @Pavan, what would be the benefit from your solution compared to mine (see latest code after EDIT)? – Bast Jul 11 '17 at 11:42
  • Well, now I got your point and updated the answer. I think you're putting significant effort to simulate something Executors already provide - the _queue_. – David Siro Jul 11 '17 at 15:08
  • Thank you David, but by submitting 10 millions of tasks, would the Executor object not be huge (i.e.: and take a big amount of RAM)? – Bast Jul 12 '17 at 06:28
  • What don't you go and try it yourself? I've just quickly run 10 million tasks doing `System.out`, the amount of consumed memory was 800MB... – David Siro Jul 13 '17 at 14:24
  • 1
    Of course I did, and your update showed me that the queue is able to manage such big amount of tasks! thanks! – Bast Jul 17 '17 at 05:19
0

ExecuterService allows you to invoke the list of tasks which can be run parallely and return the result when its available.

In your code you are using

worker = new MyRunnable(id);
executor.execute(worker);

Instead Runnable, its better to use Callable in this use case then you can submit the list of Callables for execution in single api instead of for loop.

List<Callable> workers = new ArrayList<>();
workers.add(new MyCallable(id)) // this is just for example
workers.add(new MyCallable(id))
workers.add(new MyCallable(id))

List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value.

Note that get method on Future object is blocking call

Pavan
  • 819
  • 6
  • 18
  • invokeAll is also blocking, so the initial problem (need to wait on the last thread at every loop) is not solved. :) – Bast Jul 11 '17 at 06:32
  • @Bast - As per my understanding, invokeAll is not blocking call. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection) – Pavan Jul 11 '17 at 06:41
  • You're right, only the future.get() is blocking... I could probably use your solution as the one proposed by @DavidSiro ... – Bast Jul 11 '17 at 09:42
  • Ya please try and post your feedback as it might help others as well. Thanks – Pavan Jul 11 '17 at 10:09
  • what would be the benefit from your solution compared to mine (see latest code after EDIT)? – Bast Jul 11 '17 at 11:41
  • I feel your assumtions are incorrect. When your create executorservice with some of number theread pool, it ll use those threads to do your tasks. And when you say shutdown on executorservice it does not mean you are shutting down the currently executing threads. It is just that executor service can not take anymore tasks. – Pavan Jul 12 '17 at 06:10
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/148959/discussion-between-pavan-and-bast). – Pavan Jul 12 '17 at 06:11
0

You don't need to know thread pool size to check completion of tasks in ExecutorService. You can remove your code after submitting the task.

Option 1:

  1. Replace ThreadPoolExecutor with newWorkStealingPool from Executors.

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

    It will allow better utilization of threads in ExecutorService.

    ExecutorService executor = Executors.newWorkStealingPool();
    
  2. Use invokeAll

Option 2: ( useful if you know the number of tasks in advance)

Use CountDownLatch and initialize the counter to number of tasks to be submitted.

Further references:

wait until all threads finish their work in java

How to properly shutdown java ExecutorService

Panda
  • 6,955
  • 6
  • 40
  • 55
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • yes, but because it's a while loop I wanted to dynamically add new threads in order to always have some in "queue"... actually using getActiveCount() is more correct (code updated) -- I now actually even switched to ThreadPoolExecutor in my local code – Bast Jul 11 '17 at 09:45
  • Please note that in my second code portion (i.e.: "solution"), ExecutorService is global, so it cannot be shutdown otherwise it's not usable anymore – Bast Jul 11 '17 at 09:54
  • Outside the while loop, you can keep shutdown code by using shutdown, sbutdownNow, awaitTermination APIs in sequence as quoted in above post. – Ravindra babu Jul 11 '17 at 09:59
  • Yes, shutdown() is already outside the loop in my code... the problem is that **I need to launch 1 or 10 millions tasks**, so (I assume) I cannot put them all in the queue... And I thought that using a global executor might be the solution to always have some threads in queue...I'll edit my post and add the latest code that I am running – Bast Jul 11 '17 at 11:27
  • use invokeAll() and replace ThreadPoolExecutor with newWorkStealingPool – Ravindra babu Jul 12 '17 at 04:53