0

I submitting List of LinkedBlockingQueue of the Long type to ThreadPoolExecutor and condition should be as each thread pick LinkedBlockingQueue of long and execute in parallel

This is my Method Logic

public void doParallelProcess() {

    List<LinkedBlockingQueue<Long>> linkedBlockingQueueList = splitListtoBlockingQueues();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
    Long initial = System.currentTimeMillis();
    try {

        System.out.println("linkedBlockingQueueList begin size is " + linkedBlockingQueueList.size() + "is empty"
                + linkedBlockingQueueList.isEmpty());

        while (true) {
            linkedBlockingQueueList.parallelStream().parallel().filter(q -> !q.isEmpty()).forEach(queue -> {
                Long id = queue.poll();
                MyTestRunnable runnab = new MyTestRunnable(id);
                executor.execute(runnab);
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
            });

            System.out.println("linkedBlockingQueueList end size is " + linkedBlockingQueueList.size() + "is empty"
                    + linkedBlockingQueueList.isEmpty());

            System.out.println("executor service " + executor);

            if (executor.getCompletedTaskCount() == (long) mainList.size()) {
                break;
            }

            while (executor.getActiveCount() != 0) {
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
                Thread.sleep(1000L);
            }

        }
    } catch (Exception e) {
    } finally {
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }
} `

How to submit a list of LinkedBlockingQueue to an individual thread example :

  1. List<LinkedBlockingQueue<Long>> each LinkedBlockingQueue contains 50 queue data
  2. size of List<LinkedBlockingQueue<Long>> is 50
  3. each thread should pick one LinkedBlockingQueue<Long> and execute 50 queue tasks.
Mubasher
  • 943
  • 1
  • 13
  • 36
shaaad
  • 3
  • 4
  • 2
    What is a task, and how is it related to these `Long` values? I suspect Executor is the wrong tool if you're trying to micromanage which thread gets assigned to which tasks. – shmosel Apr 03 '19 at 03:37
  • I will get a list of Long values (id's) from the Oracle DB and based on the id's I need to fetch the records and update the column values. please don't suggest pl/SQL procedures or functions. – shaaad Apr 03 '19 at 03:41
  • 1
    Why not just submit 2500 jobs? If you do it like you describe, each batch of 50 is processed one item at a time. – Josh Apr 03 '19 at 04:01
  • above mentioned 50 count is just an example consider if you have 50k records creating 50k threads is a bad practice, so 50k I am splitting into list of queues using apache list partitions and the list of queues I’m passing to the thread pool so that each thread will pick one LinkedBlockingQueue – shaaad Apr 03 '19 at 04:16
  • Welcome to Stack Overflow. What did your search and research bring up? Have you tried someting? These are the questions we always ask (if it’s not already clear from the question), so you may as well get used to it. :-) – Ole V.V. Apr 03 '19 at 04:47
  • Thanks , I tired with java 8 parallel stream and in that stream using the consumer submitting the runnable tasks to executor ,, trying my luck – shaaad Apr 03 '19 at 05:03
  • added the method logic here – shaaad Apr 03 '19 at 16:35
  • @shaaad it is possible to submit 50k tasks to an Executor without spinning up 50k threads. e.g. [Executors.newFixedThreadPool(50)](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Executors.html#newFixedThreadPool(int)) creates an Executor which will use at most 50 Threads to execute submitted tasks in parallel. – MikeFHay Apr 03 '19 at 17:10
  • @MikeFHay above mentioned 50 count is just an example consider if you have 50k records creating 50k threads is a bad practice, so 50k I am splitting into the list of queues using apache list partitions and the list of queues I’m passing to the thread pool so that each thread will pick one LinkedBlockingQueue – shaaad Apr 03 '19 at 17:13

2 Answers2

1

The input to an ExecutorService is either Runnable or Callable. Any task you submit needs to implement one of those two interfaces. If you want to submit a bunch of tasks to a thread pool and wait until they are all complete, then you can use the invokeAll method and loop over the resulting Futures, calling get on each: see this informative answer to a similar question.

You do not need to batch your input tasks into groups, though. You never want an executor service to have idle threads while there is still work left to do! You want it to be able to grab the next task as soon as resources free up, and batching in this fashion runs contrary to that. Your code is doing this:

while non-empty input lists exist {
    for each non-empty input list L {
        t = new Runnable(L.pop())
        executor.submit(t)
    }
    while (executor.hasTasks()) {
        wait
    }
}

Once one of those tasks completes, that thread should be free to move on to other work. But it won't because you wait until all N tasks complete before you submit any more. Submit them all at once with invokeAll and let the executor service do what it was built to do.

Josh
  • 500
  • 3
  • 6
  • Hi thanks for the suggestion, can you please elaborate a bit more with my logic, how to invoke all – shaaad Apr 04 '19 at 06:55
  • For every id you have, create a `Callable` and put that into a list. Then call `executorService.invokeAll(listOfCallables)`. – Josh Apr 04 '19 at 14:42
  • Spring Transaction management doesn't support in a MultiThreading environment. inside my runnable i am using spring JPA and using the transaction management – shaaad Apr 08 '19 at 12:06
  • any alternative solutions please, I need to use executor service and inside myrunnabe I will use database transactions. – shaaad Apr 08 '19 at 14:18
0

The Executors class is your main entry to thread pools:

    ExecutorService executor = Executors.newCachedThreadPool();
    linkedBlockingQueueList.forEach(queue -> executor.submit(() -> { /* process queue */ }));

If you do want to create a ThreadPoolExecutor yourself — it does give you more control over the configuration — there are at least two ways you may specify a default thread factory:

  1. Leave out the thread factory argument:

        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
  2. Use the Executors class again for getting the default thread factory:

        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory());
    
Ole V.V.
  • 81,772
  • 15
  • 137
  • 161
  • The issue with these approaches is only one thread is executing at once not executing it in parallel. – shaaad Apr 03 '19 at 16:12
  • Either you or I completely misunderstood. The idea in a thread pool is exactly that several threads execute in parallel. @shaaad – Ole V.V. Apr 03 '19 at 16:19
  • I have provided the method logic as an answer to this Question .please suggest – shaaad Apr 03 '19 at 16:21