1

In the following scenario, the finalizer thread must wait for the consumer thread to process all queue elements in order to finish execution:

private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private final Object queueMonitor = new Object();

// Consumer thread
while (true) {
    Object element = queue.take();
    consume(element);
    synchronized (queueMonitor) {
        queueMonitor.notifyAll();
    }
}

// Finalizer thread
synchronized (queueMonitor) {
    while (!queue.isEmpty()) {
        queueMonitor.wait();
    }
}

Elements are added to the queue over time. The consumer daemon thread runs all the time until the JVM terminates, at which point it must be allowed to complete the processing of all queued elements. Currently this is accomplished by the finalizer thread, which is a shutdown hook that should delay the killing of the consumer thread on JVM termination.

Problem:
If the finalizer thread is started after the last element has been taken out from the queue, then the while loop condition evaluates to false, so execution completes while consume() has not returned yet because waiting on queueMonitor is skipped completely.

Research:
An ideal solution would be to peek the queue, and then remove the element after it has been consumed.

spongebob
  • 8,370
  • 15
  • 50
  • 83
  • Are you saying that you want the "finalizer" thread to do nothing until the "consumer" thread is finished? In that case, why not just do both jobs in a single thread? – Solomon Slow Aug 14 '18 at 19:37
  • What if your "consumer" thread was not a daemon? What if, instead of `while(true)`, it looped until it found a poison pill in the queue, and then it exited? Then whatever function you call to shut the application down, it could feed a poison pill into the queue, and then `join` the consumer. – Solomon Slow Aug 14 '18 at 20:08
  • @jameslarge No function is called to shut the application down. That code is part of a library, and I would prefer not to ask the client to call an API to interrupt the non-daemon threads and let the JVM terminate, although it's an option. – spongebob Aug 14 '18 at 22:10
  • Is there any reason why you do not just wrap the entire while loop in the consumer in a synchronized block locking on the `queueMonitor` to make the finalizer block until the consumer is done? I'd say the whole setup looks error prone, and I'd recommend using something like a `CountDownLatch` as suggested below. – Janus Varmarken Aug 14 '18 at 22:37
  • @JanusVarmarken Reason being that the consumer thread never finishes. It is a daemon thread that gets killed on JVM termination. – spongebob Aug 15 '18 at 13:56
  • @FrancescoMenzani "The consumer daemon thread runs all the time until the JVM terminates, _at which point it must be allowed to complete the processing of all remaining elements_" Could you do this cleanup in a [Shutdown Hook](https://stackoverflow.com/questions/264825/get-notification-on-a-java-process-termination) instead? – Janus Varmarken Aug 15 '18 at 17:14
  • @JanusVarmarken I wouldn't define it "cleanup", and yes, it could. The consumer thread would have to be interrupted by the finalizer thread. However, consumer logic would be executed by two threads, and I feel like waiting for the consumer one to finish makes for a more polished design. – spongebob Aug 15 '18 at 17:53
  • @FrancescoMenzani Ok. Why is the consumer a daemon btw? It seems a little counterintuitive to me that it's a daemon as that makes the JVM able to exit, yet that is exactly what you want to prevent (referring to the emphasized text in the quote in my previous comment). Sorry for all the questions btw :). – Janus Varmarken Aug 15 '18 at 18:14
  • @JanusVarmarken No problem. Please read carefully the second paragraph in my question, and my comments under user1373164's answer. I don't want to prevent the JVM from exiting. Moreover, I do want my library not to prevent it from exiting, hence I only use daemon threads. – spongebob Aug 15 '18 at 18:18

1 Answers1

3

One approach could be you use a CountDownLatch – have the finalizer block on it and the consumer call countdown after consume().

Basically don't block on the queue, block on task completion.

private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean running = true;
private final CountDownLatch terminationLatch = new CountDownLatch(1);

// Consumer thread
while (running || !queue.isEmpty()) {
    Object element = queue.poll(100, TimeUnit.MILLISECONDS);
    if (element == null) continue;
    consume(element);
}
terminationLatch.countDown();

// Finalizer thread
running = false;
terminationLatch.await();
spongebob
  • 8,370
  • 15
  • 50
  • 83
user1373164
  • 428
  • 4
  • 14
  • 1
    Refactor to use an ExecutorService, finalizer can call shutdown and block on awaitTermination - no point reinventing the wheel. – user1373164 Aug 14 '18 at 19:48
  • Unfortunately, that's not possible. The implementation of `consume()` schedules additional tasks, and a `RejectedExecutionException` would be thrown. – spongebob Aug 14 '18 at 22:13
  • ok, in that case, I'd just use a CountDownLatch, call it __exitLatch__ and initialize with count of 1- the consumer thread runs `while(queue.size() > 0)` then after the while loop completes, it calls `exitLatch.countDown()`. Meanwhile all the finalizer thread has to do is `exitLatch.await(1_000,TimeUnit.MILLISECONDS)` (+ handle interrupted exceptions etc) – user1373164 Aug 15 '18 at 08:57
  • Elements are added to the queue over time. The consumer thread runs all the time until the JVM terminates, at which point it must be allowed to complete the processing of all remaining elements. – spongebob Aug 15 '18 at 14:06
  • ok, then just keep your `while(true)` - everything else still applies – user1373164 Aug 15 '18 at 14:07
  • Then the while loop never completes. Where do I call `CountDownLatch#countDown()`? – spongebob Aug 15 '18 at 14:09
  • ok... `while(!isCancelled || queue.size() > 0)`. Where isCancelled is a volatile boolean or use AtomicBoolean and provide a method on the consumer so the finalizer can request cancellation. Take a look at Cancellation and Shutdown in Java Concurrency in Practice. – user1373164 Aug 15 '18 at 14:21
  • Now there's another problem: `BlockingQueue#take()` blocks, so if the queue is empty, `CountDownLatch#countDown()` does never get called. – spongebob Aug 15 '18 at 15:41
  • use `poll(1_000, TimeUnit.MILLISECONDS);` instead of `take()`. – user1373164 Aug 15 '18 at 15:45
  • or, alternatively, interrupt the consumer thread from the finalizer thread. – user1373164 Aug 15 '18 at 15:50
  • If I chose to interrupt, could it happen while `consume()` is being executed? – spongebob Aug 15 '18 at 16:13
  • Doesn't the proposed solution allow for this unlucky ordering of events as there is no lock on the queue: 1. consumer consumes last element, 2. loop condition is checked and evaluates to false, 3. terminationLatch is counted down to zero, 4. an element is added to the queue by some producer thread, 5. finalizer progresses beyond the latch as it is now zero, but there is still an item in the queue. TL;DR: you don't prevent items from being added to the queue during the time the finalizer performs its last work. – Janus Varmarken Aug 15 '18 at 18:20
  • @JanusVarmarken It does. This is acceptable, since the application is shutting down (the finalizer thread is a shutdown hook). How would you lock the queue in order to know whether the finalizer thread is running and optionally deny producing more elements? – spongebob Aug 15 '18 at 18:28
  • @FrancescoMenzani Alright, I just thought that it violates this requirement: "[...] at which point it must be allowed to _complete the processing of all remaining elements._" So what you're really asking is how to prevent an ongoing call to `consume` from being abruptly terminated rather than how to make sure all pending elements are processed - correct? Well, if you guard interaction with the queue using a lock in all producer+consumer+finalizer threads, you could have the finalizer grab the lock until termination thereby preventing new elements from being added during that window. – Janus Varmarken Aug 15 '18 at 18:50
  • @JanusVarmarken Correct. All _queued_ elements must be processed, and the consumer thread must not die while `consume()` has not returned yet. – spongebob Aug 15 '18 at 19:13