0

I have a series of task that accept data then when finished processing move it to a different task.

Each task has its own Executor Service initialized as

 int workerSize = Runtime.getRuntime().availableProcessors() * 2;
 executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),
                                          new SongKongThreadFactory(threadGroup));

The tasks work on files each file get put on taskA queue, then when processed by taskA it may be put on taskB's queue or straight onto taskC queue.

One aim of this pipeline approach is that however many files you have the application doesnt use too much memory because only some files are being processed at any time, however each tasks has different throughput levels so Im finding that the task B queue is building up because files are processed more quickly by task A then task B. And these queues are using memory

So I changed executor servide defn to specify a amx size of the queue as follows:

executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(100),
                                              new SongKongThreadFactory(threadGroup));

My idea was that if task A tried to submit the 101 item then it would just wait until there was a space, so processing on task A would temporarily sleep and memory usage would be reduced. But instead the task just gets rejected with RejectedExecutionException .Ive since checked all the available ThreadExecutor policys and none just allow the calling process to wait.

So i think I must be approaching this in the wrong way, how should I be doing this ?

I'm currently using Java 7 and expect to move to Java 8 when released.

Paul Taylor
  • 13,411
  • 42
  • 184
  • 351
  • Does this help: http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – Harald Feb 22 '14 at 19:02
  • @Harold yes thanks, I think using CallerRunsPolicy so caller cant do anything because busy processing the failed submission itself should work, didn't understand the reasoning of CallerRunsPolicy when I looked at it before. My only concern if if the task that gets run by caller get stuck for some reason and prevents additional tasks being submitted. – Paul Taylor Feb 22 '14 at 19:20

1 Answers1

0

To temporarily pause the caller... just don't return to the caller. Wait locally until the queue is available and you can complete the task. The most efficient way to do so is, in fact, called wait(). See, among others, Difference between wait() and sleep()

Community
  • 1
  • 1
keshlam
  • 7,931
  • 2
  • 19
  • 33
  • but I dont know when the queue is available again, what am I waiting on – Paul Taylor Feb 22 '14 at 19:11
  • Modify the queue so it announces to the wait()ing threads when it has become available. Or you can sleep and retry, but then you'll wait longer than you have to and/or spend some cycles rechecking whether space has become available yet. ***OR...*** use one of the threadsafe Queue implementations already provided by Java, which encapsulate the wait-until-space-available behavior. – keshlam Feb 22 '14 at 19:19
  • Right I'll just use CallerRunsPolicy, somewhat simpler. – Paul Taylor Feb 22 '14 at 19:21