0

I am writing a customized ThreadPoolExecutor with extra features as:-

  1. If number of threads are more than core pool size but less than max pool size and queue is not full and there are no ideal threads then create a new thread for a task.

  2. If there are ideal threads and as task comes assign that task to queue rather than adding it to the queue.

  3. If all threads (upto max pool size are busy) then as new task come add them to the queue using reject method of RejectionHandler

I have overridden execute method of ThreadPoolExecutor version java 1.5.

The new code is as follows:-

 public void execute(Runnable command) {
        System.out.println(" Active Count: "+getActiveCount()+" PoolSize: "+getPoolSize()+" Idle Count: "+(getPoolSize()-getActiveCount())+" Queue Size: "+getQueue().size()); 
         if (command == null)
             throw new NullPointerException();
         for (;;) {
             if (runState != RUNNING) {
                 reject(command);
                 return;
             }
             if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) {
                 return;
             }
            if (runState == RUNNING && (getPoolSize()-getActiveCount() != 0) && workQueue.offer(command)) {
                 return;
             }
             int status = addIfUnderMaximumPoolSize(command);
             if (status > 0)      // created new thread
                 return;
             if (status == 0) {   // failed to create thread
                 reject(command);
                 return;
             }

             if (workQueue.offer(command))
                 return;
             // Retry if created a new thread but it is busy with another task
         }
     }

The legacy code is as below:-

 public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          for (;;) {
             if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
             if (status == 0) {   // failed to create thread
                 reject(command);
                  return;
             }
            // Retry if created a new thread but it is busy with another task
          }
      }

The problem which is now getting generated is that its not creating new threads when threads are idle but its not even allocating tasks to those threads else it is adding them to the queue which is not desired as we don't want the task to wait but process it asap even if it requires new thread creation but waiting is not allowed for a task.

PLEASE HELP ME IN RESOLVING THIS ISSUE. Thanks.

Gray
  • 115,027
  • 24
  • 293
  • 354
Scientist
  • 1,458
  • 2
  • 15
  • 31
  • Isn't that the default behaviour of [CachedThreadPool](http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()) ? – Fildor Oct 10 '13 at 17:53
  • No, This is not cached thread pool we have created a queue of greater capacity. – Scientist Oct 10 '13 at 18:11
  • Why would you do this by overriding execute? You can do what you want by creating a ThreadPool, using the [Constructor](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#ThreadPoolExecutor(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.BlockingQueue)) and giving it core- and maxpoolsize along with an unbounded queue. Then it will have exactly the behaviour you describe. – Fildor Oct 10 '13 at 20:38

2 Answers2

0

If I understand the question, I believe that I've found a solution to the default behavior of the ThreadPoolExecutor that I show in my answer here:

How to get the ThreadPoolExecutor to increase threads to max before queueing?

Basically you LinkedBlockingQueue to have it always return false for queue.offer(...) which will add an additional threads to the pool, if necessary. If the pool is already at max threads and they all are busy, the RejectedExecutionHandler will be called. It is the handler which then does the put(...) into the queue.

See my code there.

Community
  • 1
  • 1
Gray
  • 115,027
  • 24
  • 293
  • 354
-1

As much as i have understand of the three functionality you described, I think using ExectuorService would do much more than what you are currently trying to do: An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks, specially with:

1.catched thread pool: allows createing as many threads it needs to execute the task in parrallel. The old available threads will be reused for the new tasks and Fixed thread pool.

2.Fixed Thread Pool : provides a pool with fixed number of threads. If a thread is not available for the task, the task is put in queue waiting for an other task to ends.

Check out this article for detail explanation and nice example.

Sage
  • 15,290
  • 3
  • 33
  • 38
  • No i dont want to use cached pool and fixed pool as i have placed a upper bound on max pool size and also i want to say that both of them internally implement ThreadPoolExecutor. – Scientist Oct 10 '13 at 17:57
  • @TusharSrivastava, your first condition with max pool size and core pool size is pretty ambiguous to me. Why do you need such bound. Fixed thread pool allows setting a fixed number of thread at a time which is same as `max pool size` to me. can you plz explain a little more in depth ? – Sage Oct 10 '13 at 18:20