13

I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.

Is there anything I can do to somehow make the program wait if the thread queue is full of work?

Gray
  • 115,027
  • 24
  • 293
  • 354
Lostsoul
  • 25,013
  • 48
  • 144
  • 239

4 Answers4

27

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Instead of an open-ended queue, you can use a BlockingQueue with a limit on it:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

In terms of jobs submitted to an ExecutorService, instead of using the default ExecutorServices created using Executors, which use an unbounded queue, you can create your own:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler that submits to the queue. Something like:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
      // check afterwards and throw if pool shutdown
      if (executor.isShutdown()) {
         throw new RejectedExecutionException(
              "Task " + r + " rejected from " + e);
      }
   }
});

I think it's a major miss that Java doesn't have a ThreadPoolExecutor.CallerBlocksPolicy.

Gray
  • 115,027
  • 24
  • 293
  • 354
  • Thanks Gray. Just to clarify this means that if I put 200 items in the queue, it won't allow anymore until the work from that queue goes down? and if thats the case it just waits at the command I'm using to submit to a thread until there's room to send into the blocking queue? – Lostsoul Apr 27 '12 at 15:32
  • If you use `put()` on the `BlockingQueue` @learningJava, then yes, it will block if the queue is full and then continue when the queue size decreases. – Gray Apr 27 '12 at 15:50
  • +1: Another option is to have the RejectedExecutionHandler run the task instead. i.e. `r.run();` – Peter Lawrey Apr 27 '12 at 16:37
  • 1
    Agreed @Peter although I rarely use the `CallerRunPolicy` because if you have 100 producers, you are going to grossly exceed your number of concurrent running threads. – Gray Apr 27 '12 at 16:40
3

If you want the acknowledgment when the worker starts working on the task, you can make a custom ThreadFactory that sends the acknowledgment from the thread before doing the actual work. OR you can override beforeExecute of a ThreadPoolExecutor.

If you want the acknowledgment when a new worker is freed up for a new task, I think you can initialize a ThreadPoolExecutor with a SynchronousQueue and a ThreadPoolExecutor.CallerRunsPolicy, or with your own policy where the caller blocks.

trutheality
  • 23,114
  • 6
  • 54
  • 68
0

first, i think your attitude is wrong because what you did in your pseudo code is busy waiting, you should read through the Concurrency tutorial from java toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/

ignoring that, ill offer you a solution with the busy wait (which is not recommanded):

     ExecutorService e1 =  Executors.newFixedThreadPool(20);
     while (true) {
          if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
          if (!myq.isEmpty()) e1.execute(myq.poll());
     }

NOTES:

1.make sure your myq is synchronized, as said in the other answers. you can extend some blocking queue to make sure the synchronization is correct.

2.you implement a runnable class which does what you exepct from the server in an iteration of service, those runnables have to get myq as a parameter to the constructor and save it as global variable.

3.myq gets the runnables, that in the end of its run method, you must make sure the runnable deletes itself from myq.

Ofek Ron
  • 8,354
  • 13
  • 55
  • 103
  • The OP's `while(true)` wasn't busy waiting. Your `while(true)` **is** trying to busy wait and doesn't address the problem, which is that the executor's default queue is unbounded, so it will keep accepting new jobs independently of how many are completed. – trutheality Apr 27 '12 at 17:15
  • myp should be the ones who is bounded, you are not paying attention to whats written in the code and notes – Ofek Ron Apr 27 '12 at 18:17
0

What about having a blockingPool which will not execute more than 200 tasks and wait for a task to complete before submitting 201 task. I've achieved it using semaphore in my application. You can also change the limit by passing the value to its constructor.

Only difference here from @Gray answer is that rarely any task will get rejected in this case. Semaphore will make any 201 task to wait unless a other task gets over. Nevertheless, we have rejection handler to re-submit that task to executor in case of any rejection.

private class BlockingPool extends ThreadPoolExecutor {
    private final Semaphore semaphore;      
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){    
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                executor.execute(r);
            }
        });
        semaphore = new Semaphore(tasksAllowedInThreads);
    }

    @Override
    public void execute(Runnable task){
        boolean acquired = false;
        do{
            try{
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e){
                // log
            }
        } while (!acquired); // run in loop to handle InterruptedException
        try{
            super.execute(task);
        } catch (final RejectedExecutionException e){
            System.out.println("Task Rejected");
            semaphore.release();
            throw e;
        }
    }    

    @Override
    protected void afterExecute(Runnable r, Throwable t){
        super.afterExecute(r, t);
        if (t != null){
            t.printStackTrace();
        }
        semaphore.release();
    }
}

Does this make sense!

Rahul Winner
  • 430
  • 3
  • 16