0

I'm using RabbitMQ which uses LinkedBlockingQueue by default for consumer. It has a blocking nextDelivery() method which basically calls take() on queue.

But it doesn't catch the interrupt if it was interrupted just before calling that method.

if (Thread.interrupted()) {
    throw new InterruptedException();
}
// If interrupted here, call below will not throw interrupted exception
rabbit.nextDelivery();

It only works if interrupt hits while it is waiting -which is what is written in javadoc too- or if hits if block first.

Throws:
    InterruptedException - if interrupted while waiting

I actually have a case where interrupt hits just where I marked. It works if I put a sleep to beginning or in between but it's still not safe to assume it will always work.

Is there an alternative BlockingQueue implementation addresses this issue? I don't know if interrupt it consumed or not, maybe there is a race condition in static method it returns false but clears the set value somehow?

Edit: It is not about Thread.interrupted() call or not setting flags. If you comment out if block, it is same issue again. Queue method doesn't throw InterruptedException as soon as it enters, it just blocks

Gray
  • 115,027
  • 24
  • 293
  • 354
acheron55
  • 5,429
  • 2
  • 23
  • 22
  • If the `LinkedBlockingQueue.take()` doesn't throw then the interrupt bit on the `Thread` is not set. I've edited my answer a bit if it helps. – Gray Jan 07 '14 at 23:23
  • Any chance `RabbitMQ` is using its own `LinkedBlockingQueue` and not the one in `java.util.concurrent`? – Gray Jan 07 '14 at 23:24
  • I just called `System.out.println(Thread.currentThread().isInterrupted());` just before rabbit call, it printed true(which is not clearing the bit), and then rabbit just blocks again. So you are right it is probably about rabbit, I'm looking into it – acheron55 Jan 07 '14 at 23:26
  • 1
    Rather than trying to suss out the internal operations of Rabbit's nextDelivery() call (who knows or cares what queue implantation it is using), you should tell us what you are trying to do that you think you need to be able to intercept the interrupt state of the thread. – Will Hartung Jan 07 '14 at 23:29
  • If that is the case then the `rabbit` call is clearing the interrupt status _before_ the `LinkedBlockingQueue.take()` call is made. I can't find `nextDelivery()` in their source code. What type is `rabbit`? That's not your code? – Gray Jan 07 '14 at 23:30
  • Turns out `DeclareOk queueDeclare = channel.queueDeclare(queueName, true, false, false, null);` this call clears the bit(and many other methods used to initialize stuff). It is here: `com.rabbitmq.client.impl.ChannelN`. That rabbit variable was of type my class wrapping real rabbit code, it was creating queue if it didn't exist, like singleton, then calling delivery – acheron55 Jan 07 '14 at 23:42

1 Answers1

3

But it doesn't catch the interrupt if it was interrupted just before calling that method.

So if I'm understanding, your question, then something is swallowing an InterruptedException or clearing the interrupt flag. The following code always throws InterruptedException for me.

Thread.currentThread().interrupt();
new LinkedBlockingQueue<String>().take();

It is important to realize that when the InterruptException is thrown, the interrupt flag is cleared. You should always do something like:

try {
    // other methods where this is needed are Object.wait(...), Thread.join(...)
    Thread.sleep(100);
} catch (InterruptedException ie) {
    // re-interrupt the thread
    Thread.currentThread().interrupt();
    // deal with the interrupt by returning or something
    ...
}

See: Why invoke Thread.currentThread.interrupt() when catch any InterruptException?

What often happens is that 3rd party code does not propagate the interrupt status because of bad code. Then you are often SOL since the interrupt will have been swallowed and the take() method will not throw.

Also, it is important to realize that Thread.interrupted() clears the interrupt flag. Typically you want to use Thread.currentThread().isInterrupted() to test for the status of the interrupt flag.

Is there an alternative BlockingQueue implementation addresses this issue?

I'm not sure there is an issue.

Community
  • 1
  • 1
Gray
  • 115,027
  • 24
  • 293
  • 354
  • Are you sure catching it clears the flag? I'd have expected it to just not ever be set. There also isn't much reason to re-interrupt if you're returning out of the main worker loop of the thread, that makes sense from inside a nested method that needs to pass the interrupt along to the caller. – millimoose Jan 07 '14 at 22:53
  • Catching it clears the flag, yes @millimoose. http://stackoverflow.com/questions/4906799/why-invoke-thread-currentthread-interrupt-when-catch-any-interruptexception – Gray Jan 07 '14 at 22:54
  • 1
    I suppose my question is whether the act of catching the exception clears the flag, or if it's cleared when the exception is thrown. Then again I now realise the distinction is pretty academic since client code can't observe the difference. – millimoose Jan 07 '14 at 23:00
  • Oh I see. Yes @millimoose. It is when it is thrown, not caught. I'll change my post. – Gray Jan 07 '14 at 23:02
  • It never executes catch block, because interrupt happens right after that. My throw code is not executed either. So not setting flag again is not the cause here – acheron55 Jan 07 '14 at 23:02
  • I suspect @acheron55 that to get the interrupt to happen right before the `take()` call, the OP is putting a `Thread.sleep(...)` method which eats the interrupt. – Gray Jan 07 '14 at 23:05
  • No I don't put sleep ofcourse. I run in debug mode(it happens if I run normally too) where I just wait in line before take() and send interrupt from other thread, then current thread just waits in take() it doesn't throw anything. I even commented out `if`block so there is only `rabbit.nextDelivery();` call, which doesnt handle interrupt correctly – acheron55 Jan 07 '14 at 23:13
  • Then I wonder if this is some sort of debugger artifact @acheron55. Because calling `Thread.currentThread().interrupt(); new LinkedBlockingQueue().take();` always throws for me, – Gray Jan 07 '14 at 23:16