2

I have blocking queue with multiple producers and single consumer (which enough for post processing of items).

Producers starts by schedule which send tasks to executor pool and then tasks adds by workers to queue.

My question about how to start consumer thread?

For now I have @EventListener (SpringBoot) that send on start to singleThreadExecutorPool method which serve the queue in infinite while loop, maybe better solution exists for this case. Looks like pretty common pattern for consuming queue.

Sonique
  • 6,670
  • 6
  • 41
  • 60
  • Why not simply have your producers feed a consumer executor directly? – teppic Oct 31 '17 at 09:34
  • I have multiple consumers to smooth IO and network service latency that fluctuates widely. And for me is much easy to manage one consumer than few. But yeah, I think your approach also may be good, you mean that after getting result producers wait while (one)consumer will take it? I think about it, but decide queue because I can implement some logic (and stat monitoring) based on the fulfillment of the queue (yeah, maybe not best approach, that why I ask questions here :D) – Sonique Oct 31 '17 at 09:55
  • or you mean implement internal queue in consumer? – Sonique Oct 31 '17 at 09:56
  • Check if this post is useful : https://stackoverflow.com/questions/2332537/producer-consumer-threads-using-a-queue/37767243#37767243 – Ravindra babu Oct 31 '17 at 10:30

1 Answers1

2

Your approach is perfectly fine. Here is my personal pattern for such cases.

@Component
public class ConsumerTask implements Runnable {

  private ExecutorService executorService;
  private BlockingQueue<Object> queue;

  // use dependency injection if needed
  public ConsumerTask(BlockingQueue<Object> queue) {
    executorService = Executors.newSingleThreadExecutor();
    this.queue = queue;
  }

  @PostConstruct
  public void init() {
    executorService.execute(this);
  }

  @PreDestroy
  public void destroy() {
    // unlike shutdown() shutdownNow() sends interruption to running tasks
    executorService.shutdownNow();
  }

  @Override
  public void run() {
    try {
      while (true) {
        Object o = queue.take();
        // process o
      }
    } catch (InterruptedException e) {
      // we were interrupted by shutdownNow(), restore interrupted status and exit
      Thread.currentThread().interrupt();
    }
  }
}
Mikita Harbacheuski
  • 2,193
  • 8
  • 16