32

I’m using java.util.concurrent.BlockingQueue in a very simple producer-consumer scenario. E.g. this pseudo code depicts the consumer part:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

So far so good. In the javadoc of the blocking queue I read:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

Unfortunately because of the generics in use and the nature of ComplexObject it’s not trivial to push a "poison object" into the queue. So this "common tactic" is not really convenient in my scenario.

My question is: what other good tactics/patterns can I use to "close" the queue?

Thank you!

Lachezar Balev
  • 11,498
  • 9
  • 49
  • 72
  • 2
    Have you considered using an [ExecutorService](http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html) to handle your whole processing scenario, or is that not an option? If it is, I'll write up an answer with an example if you want. – Rob Hruska Mar 21 '11 at 14:28
  • 1
    @Rob Hruska I would write that up! It's a good alternative. – jdmichal Mar 21 '11 at 14:33
  • @Rob Hruska yes, please write such an example. It would definitely be a valueable reference! – Lachezar Balev Mar 21 '11 at 14:35
  • @jdmichal - I've added an example answer below. FWIW, `java.util.concurrent` is a really great package, and often underused; a lot of people don't even know it exists, or what it's capable of. – Rob Hruska Mar 21 '11 at 15:07
  • you could just construct a public static poison pill once in the consumer class and just check if argument == this pill, no? – aiguy Mar 22 '16 at 15:38
  • I just found this page, which (no offense to other contributors!) is somewhat outdated. I elected to use `Option`, with an empty Option as the poison pill. – AbuNassar Aug 29 '16 at 19:03

10 Answers10

21

If you have a handle to the consumer thread, you can interrupt it. With the code you gave, that will kill the consumer. I would not expect the producer to have this; it would probably have to callback to the program controller somehow to let it know it's done. Then the controller would interrupt the consumer thread.

You can always finish doing work before obeying the interrupt. For instance:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

I would actually prefer that to somehow poisoning the queue anyway. If you want to kill the thread, ask the thread to kill itself.

(It's nice to see someone handling InterruptedException properly.)


There seems to be some contention about the handling of interruptions here. First, I would like everyone to read this article.

Now, with the understanding that no one actually read that, here's the deal. A thread will only receive an InterruptedException if it was currently blocking at the time of interrupt. In this case, Thread.interrupted() will return false. If it was not blocking, it will NOT receive this exception, and instead Thread.interrupted() will return true. Therefore, your loop guard should absolutely, no matter what, check Thread.interrupted(), or otherwise risk missing an interruption to the thread.

So, since you are checking Thread.interrupted() no matter what, and you are forced to catch InterruptedException (and should be dealing with it even if you weren't forced to), you now have two code areas which handle the same event, thread interruption. One way to handle this is normalize them into one condition, meaning either the boolean state check can throw the exception, or the exception can set the boolean state. I choose the later.


Edit: Note that the static Thread#interrupted method clears the the interrupted status of the current thread.

Aleksandr Kravets
  • 5,750
  • 7
  • 53
  • 72
jdmichal
  • 10,984
  • 4
  • 43
  • 42
  • What will happen if the queue is still full and I interrupt the consumer? – Lachezar Balev Mar 21 '11 at 13:46
  • @lucho It would still die. That's a good point. See answer for update. – jdmichal Mar 21 '11 at 13:55
  • @jdmichal Thank you! That really seems the alternative tactics to the poison object and it answers my question. – Lachezar Balev Mar 21 '11 at 14:01
  • @lucho I updated the code. Previous code was not handling the interrupt correctly at all. – jdmichal Mar 21 '11 at 14:09
  • @jdmichal Ok. I saw the update. Indeed seems better now. BTW I decided to implement a poison object but this discussion had a high educational value for me. – Lachezar Balev Mar 21 '11 at 14:15
  • FYI, these are really 2 different "end" scenarios. you use "poison pill" if you want the consumer to finish _whenever it is done processing the data currently in the queue_. you use a thread interrupt when you want the consumer to finish _now, regardless of what work is left in the queue_. – jtahlborn Mar 21 '11 at 14:20
  • Thread interrupts are not absolute. If they were, they would simply end the thread immediately with no feedback. Completing current processing, or in this case currently queued processing, is perfectly permissible. – jdmichal Mar 21 '11 at 14:26
  • 3
    Caveat here: how do you know there isn't another source of thread interrupt that doesn't intend to *stop* the thread? – Jason S Mar 21 '11 at 14:54
  • 2
    (e.g. debugger breakpoints or whatever) -- if you want to use this approach, you really should have a threadsafe boolean that signals an explicit intention to stop the consumer, and rearchitect the loop so that if this boolean is not set, then it propagates the interrupt properly. There's also not much point in calling `Thread.interrupt()` if you're immediately going to call `Thread.interrupted()` -- stop mucking with thread status and just use a local boolean. – Jason S Mar 21 '11 at 14:58
  • not sure if these comments were addressed at mine or not. i'm aware of how interrupts work. yes, you need some other signal (e.g. boolean) combined with interrupt in order to get a robust impl. i was attempting to highlight the different scenarios (e.g. stop when you're done vs. stop right now-ish). – jtahlborn Mar 21 '11 at 16:55
  • @Jason S: It will stop the thread, after the thread is finished what it is doing. There is nothing that states that a thread interruption need immediately end the thread; I do not understand where this perception comes from. If your program has the need to separate two types of exit requests, for instance "stop after you process current jobs" vs "stop now", then yes it will obviously need to differentiate between those two states. As for the rest of your comments, I believe I have sufficiently answered them with an addendum to my answer. – jdmichal Mar 21 '11 at 18:56
  • @Chinasaur `Thread.interrupted()` or `Thread.currentThread().isInterrupted()` both return the same flag. The only difference is that `Thread.interrupted()` clears the flag, but since the code above does not care about the flag after the loop guard, this behavioural difference is meaningless. – jdmichal Sep 10 '12 at 17:10
  • Fair, but the non-clearing behavior is more intuitive. Might be nice to mention the clearing behavior in the answer for the unwary. – Chinasaur Sep 10 '12 at 18:49
13

Another idea for making this simple:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

Then use BlockingQueue<QueueableComplexObject> as the queue. When you wish to end the queue's processing, do queue.offer(NullComplexObject.INSTANCE). On the consumer side, do

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

No instanceof required, and you don't have to construct a fake ComplexObject which may be expensive/difficult depending on its implementation.

Jason S
  • 184,598
  • 164
  • 608
  • 970
  • It's annoying that "a BlockingQueue does not accept null elements...null is used as a sentinel value to indicate failure of poll operations." A null value is an ideal "poison" element. Failure of poll operations should be handled via exceptions. – Greg Brown Apr 01 '22 at 22:26
9

An alternative would be to wrap the processing you're doing with an ExecutorService, and let the ExecutorService itself control whether or not jobs get added to the queue.

Basically, you take advantage of ExecutorService.shutdown(), which when called disallows any more tasks from being processed by the executor.

I'm not sure how you're currently submitting tasks to the QueueConsumer in your example. I've made the assumption that you have some sort of submit() method, and used a similar method in the example.

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

This example is just an off-the-cuff illustration of what the java.util.concurrent package offers; there are probably some optimizations that could be made to it (e.g., QueueConsumer as its own class probably isn't even necessary; you could just provide the ExecutorService to whatever producers are submitting the tasks).

Dig through the java.util.concurrent package (starting at some of the links above). You might find that it gives you a lot of great options for what you're trying to do, and you don't even have to worry about regulating the work queue.

Rob Hruska
  • 118,520
  • 32
  • 167
  • 192
  • This is a great idea, for instance, you can regulate the work queue to have a size 50 and "max backlog" of 200 before it starts "pushing back" on the producer, like this: ` threadPool = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2000)); ` Then you can submit Runnable's or Callable's without flooding it. Then when you're done, join on the thread pool: http://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice (however handling failures in child threads is still up to you). GL. – rogerdpack Apr 08 '15 at 19:31
  • The problem with the above, at least when I tried something similar, is that with a simple shutdown the queue's take() method which blocks does not stop and the program hangs. With shutdown() the blocking take() just dies but does not throw InterruptedException – idipous Nov 14 '15 at 23:35
7

Another possibility for making a poison object: Make it be a particular instance of the class. This way, you do not have to muck around subtypes or screw up your generic.

Drawback: This won't work if there's some sort of serialization barrier between the producer and consumer.

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}
jdmichal
  • 10,984
  • 4
  • 43
  • 42
4

You can wrap your generic object into a dataobject. On this dataobject you can add additional data like the poison object status. The dataobject is a class with 2 fields. T complexObject; and boolean poison;.

Your consumer takes the data objects from the queue. If a poison object is returned, you close the consumer, else you unwrap the generic and call 'process(complexObject)'.

I'm using a java.util.concurrent.LinkedBlockingDeque<E> so that you can add object at the end of the queue and take them from the front. That way your object will be handled in order, but more important it's safe to close the queue after you run into the poison object.

To support multiple consumers, I add the poison object back onto the queue when I run into it.

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}
Dorus
  • 7,276
  • 1
  • 30
  • 36
3

Is it possible to extend ComplexObject and mock out the non-trivial creation functionality? Essentially you're ending up with a shell object but you can do then do instance of to see if is the end of queue object.

Chris Knight
  • 24,333
  • 24
  • 88
  • 134
  • Well, it's a bit ugly to extend it though it is not impossible. That is the reason that motivated me to ask for other good practices. – Lachezar Balev Mar 21 '11 at 13:49
  • @lucho: It might make it easier to do so, if it's possible to separate out the interface from an implementation class, then make a dummy implementation for the end-of-queue object. – Jason S Mar 21 '11 at 13:52
  • Maybe this will be my final solution but nevertheless I was interested in the alternatives... – Lachezar Balev Mar 21 '11 at 13:55
1

It seems reasonable to me to implement a close-able BlockingQueue:

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

It would be quite straight forward to implement this with the following behaviours (cf. the methods summary table):

  • Insert:

    • Throws exception, Special value:

      Behaves like a full Queue, caller's responsibility to test isClosed().

    • Blocks:

      Throws IllegalStateException if and when closed.

    • Times out:

      Returns false if and when closed, caller's responsibility to test isClosed().

  • Remove:

    • Throws exception, Special value:

      Behaves like a empty Queue, caller's responsibility to test isClosed().

    • Blocks:

      Throws NoSuchElementException if and when closed.

    • Times out:

      Returns null if and when closed, caller's responsibility to test isClosed().

  • Examine

    No change.

I did this by editing the source, find it at github.com.

Corin
  • 2,417
  • 26
  • 23
  • Hi, very nice implementation. I have two questions: License: Is your license valid? It seems you have taken it from Sun proprietary sources and released und GPLv2 - is it safe to use your code in OS projects? Closable: I have two threads, one is waiting in #tryClose and the other one is waiting for the lock of #permitClose - it seems they're deadlocked. Could you check the sources for me? I can't find any mistake but still my threads are locked. – Jan Aug 30 '12 at 12:27
  • Thanks for the interest Jan, the Sun code has GPLv2 license so I haven't change that license. As for your deadlock, could you please provide sample code which displays the problem (and open an issue on Github :)... I don't have lots of free time but I'd like to know if there's problems in the code. – Corin Oct 13 '12 at 23:56
  • well thanks, I indeed had a bug in my testcase. your implementation works fine. Regarding the Oracle sources, I just didn't find the GPL headerin my local sources from the JDK but if your're sure about that, it is fine, of course. – Jan Oct 15 '12 at 13:15
  • Another question: How would I write the consumer part that takes alles elements until the queue is emtpy and closed? Is it possible without catching the closed exception? – Jan Jan 23 '13 at 11:13
1

Today I solved this problem using a wrapper object. Since the ComplexObject is too complex to subclass I wrapped the ComplexObject into ComplexObjectWrapper object. Then used ComplexObjectWrapper as the generic type.

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

Now instead of BlockingQueue<ComplexObject> I did BlockingQueue<ComplexObjectWrapper>

Since I had control of both the Consumer and Producer this solution worked for me.

ArnabRaxit
  • 11
  • 2
0

I have used this system:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

When producer finish, I set on consumerObject, queueIsNotEmpty field to false

Anders R. Bystrup
  • 15,729
  • 10
  • 59
  • 55
daliborn
  • 103
  • 1
  • 6
  • Other threads won't see the flag changing right away. `queueIsNotEmpty` needs to be a volatile, or an AtomicBoolean or protected by a mutex. – Jazzwave06 Jun 18 '16 at 16:25
0

In this situation, you generally have to ditch the generics and make the queue hold type Object. then, you just need check for your "poison" Object before casting to the actual type.

jtahlborn
  • 52,909
  • 5
  • 76
  • 118