0

Let's say in Java I have a bounded BlockingQueue and I want to process items off that queue with at most N threads. The amount N should be adjustable. The solution should take not more than N items off the queue, such that it preserves its role as providing back pressure. How would I achieve this?

I got a long way with http://jcip.net/listings/BoundedExecutor.java, but that doesn't really allow for resizes.

Freek
  • 1,506
  • 1
  • 11
  • 25
  • https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html – JB Nizet Jul 07 '18 at 16:46
  • You might find https://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with-a-size-limit useful. It discusses creating a dynamic thread pool with a maximum number of threads. – Tomboyo Jul 07 '18 at 16:54

1 Answers1

1

Maybe you can upgrade the class http://jcip.net/listings/BoundedExecutor.java to provide a function which can add more permits to Semaphore (you can use release(int permits) to add more permits) and use CachedThreadPool which will provide the flexibility you needed?

Neerav Vadodaria
  • 317
  • 1
  • 11
  • This was what I thought about first, but when I researched it I somehow came to the conclusion there was a problem with `reducePermits()`. Now that I'm looking at it again, I'm not too sure, will check it out again. One drawback of this is solution is that you need to keep track of the `N` yourself, but I guess that shouldn't be too much of a problem. – Freek Jul 13 '18 at 06:11
  • @Freek could you highlight what problem you see with `reducePermits()`? Would be interesting to know. – Neerav Vadodaria Jul 13 '18 at 07:40
  • @Freek regarding the tracking of N, Maybe you can keep N as a state in the BoundedExecutor class itself and provide a getter and a function which can either increase of decrease the N? – Neerav Vadodaria Jul 13 '18 at 07:42
  • I was/am afraid that threads already waiting to `acquire` would still be executed, and therefore it could take a while before the actual amount of threads would be reduced. However, the docs state "This method differs from acquire in that it does not block waiting for permits to become available.", so it's probably not the case. – Freek Jul 13 '18 at 08:26
  • Finally got around testing this. It works quite nice, good enough for my use case. There's one issue I encountered, which has to do with BoundedExecutor: since the release of the semaphore occurs in the submitted task itself, there's often 1 more thread running than specified. This is because the thread isn't free as far as the Executor is concerned, while the semaphore _is_ released. To prevent needless Thread creation, I give the Threads a suitable keepalive. – Freek Jul 22 '18 at 21:18