6

I started reading more about ThreadPoolExecutor from Java Doc as I am using it in one of my project. So Can anyone explain me what does this line means actually?- I know what does each parameter stands for, but I wanted to understand it in more general/lay-man way from some of the experts here.

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

Updated:- Problem Statement is:-

Each thread uses unique ID between 1 and 1000 and program has to run for 60 minutes or more, So in that 60 minutes it is possible that all the ID's will get finished so I need to reuse those ID's again. So this is the below program I wrote by using above executor.

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

My Questions is:- This code is right as far as the Performance is considered or not? And what else I can make it here to make it more accurate? Any help will be appreciated.

arsenal
  • 23,366
  • 85
  • 225
  • 331
  • I don't see anything wrong with your code, other than maybe having a try/finally in your "run" method to ensure that id's always get released (for when you have a more complicated code in "someMethod"). – Matt May 27 '12 at 20:02
  • @Matt, Thanks for commenting out, So someMethod has to be synchronized? or not? As in my case I have made that synchronized. – arsenal May 27 '12 at 20:07
  • And `idPool.releaseExistingId(id);` should come in the finally block right? – arsenal May 27 '12 at 20:18
  • Why are [these](http://stackoverflow.com/q/10770348/823393) [three](http://stackoverflow.com/q/10770003/823393) [questions](http://stackoverflow.com/q/10769606/823393) so similar? ... `Problem Statement is:- Each thread uses unique ID between 1 and 1000` ... – OldCurmudgeon May 29 '12 at 23:27

4 Answers4

13

[First, i apologize, this is a response to a previous answer, but i wanted formatting].

Except in reality, you DON'T block when an item is submitted to a ThreadPoolExecutor with a full queue. The reason for this is that ThreadPoolExecutor calls the BlockingQueue.offer(T item) method which by definition is a non-blocking method. It either adds the item and returns true, or does not add (when full) and returns false. The ThreadPoolExecutor then calls the registered RejectedExecutionHandler to deal with this situation.

From the javadoc:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

By default, the ThreadPoolExecutor.AbortPolicy() is used which throws a RejectedExecutionException from the "submit" or "execute" method of the ThreadPoolExecutor.

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

However, you can use other handlers to do something different, such as ignore the error (DiscardPolicy), or run it in the thread which called the "execute" or "submit" method (CallerRunsPolicy). This example lets whichever thread calls the "submit" or "execute" method run the requested task when the queue is full. (this means at any given time, you could 1 additional thing running on top of what's in the pool itself):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

If you want to block and wait, you could implement your own RejectedExecutionHandler which would block until there's a slot available on the queue (this is a rough estimate, i have not compiled or run this, but you should get the idea):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
Matt
  • 11,523
  • 2
  • 23
  • 33
  • I don't understand that much you just said as I am new to Executor family, So the way I am doing in my question is right or not? Or there can be some improvement in the code? – arsenal May 27 '12 at 19:21
  • I was merely commenting on someone's claim that "It uses an ArrayBlockingQueue to manage the execution requests with 10 slots, so when the queue is full (after 10 threads have been enqueued), it will block the caller." This is not the case as I have explained above, unless you deliberately do some code to make this happen. Using the caller runs policy actually runs it in the current thread, this statement above implies that you would block until there's room in the queue. – Matt May 27 '12 at 19:56
2

It's creating an ExecutorService which handles the execution of a pool of threads. Both the initial and maximum number of threads in the pool is 10 in this case. When a thread in the pool becomes idle for 1 second (1000ms) it will kill it (the idle timer), however because the max and core number of threads is the same, this will never happen (it always keeps 10 threads around and will never run more than 10 threads).

It uses an ArrayBlockingQueue to manage the execution requests with 10 slots, so when the queue is full (after 10 threads have been enqueued), it will block the caller.

If thread is rejected (which in this case would be due to the service shutting down, since threads will be queued or you will be blocked when queuing a thread if the queue is full), then the offered Runnable will be executed on the caller's thread.

Francis Upton IV
  • 19,322
  • 3
  • 53
  • 57
  • Thanks for the comment, and what does that true means in ArrayBlockingQueue? – arsenal May 26 '12 at 23:16
  • It has a fixed number of slots backed by an array and will block the caller when enqueuing if they are full. There are other sorts of queues that can be used. Look at the description of it in the Javadoc for more information. – Francis Upton IV May 26 '12 at 23:20
  • I updated my question in which I have posted my code, and I have also mentioned my problem statement, so is there any problem in that code? if yes how can I make it more accurate? Any help will be appreciated. – arsenal May 26 '12 at 23:29
  • Can you take a look in my updated question if the way I did is right or not? And is there any way we can improve that code? That will be of great help to me if you can give some sort of suggestions. – arsenal May 27 '12 at 20:15
0

Consider semaphores. These are meant for the same purpose. Please check below for the code using semaphore. Not sure if this is what you want. But this will block if there are no more permits to acquire. Also is ID important to you?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}
Ravi
  • 545
  • 3
  • 5
0

Another solution is to hack underlying queue to replace offer with offer with large timeout (up to 292 years, can be considered infinite).


// helper method
private static boolean interruptibleInfiniteOffer(BlockingQueue<Runnable> q, Runnable r) {
    try {
        return q.offer(r, Long.MAX_VALUE, TimeUnit.NANOSECONDS); // infinite == ~292 years
    } catch (InterruptedException e) {
        return false;
    }
}

// fixed size pool with blocking (instead of rejecting) if bounded queue is full
public static ThreadPoolExecutor getFixedSizePoolWithLimitedWaitingQueue(int nThreads, int maxItemsInTheQueue) {
    BlockingQueue<Runnable> queue = maxItemsInTheQueue == 0
            ? new SynchronousQueue<>() { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} }
            : new ArrayBlockingQueue<>(maxItemsInTheQueue) { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} };
    return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS, queue);
}
Alexander Pavlov
  • 2,264
  • 18
  • 25