7

I'm using a thread that is continuously reading from a queue.

Something like:

public void run() {
    Object obj;
    while(true) {
        synchronized(objectsQueue) {
            if(objectesQueue.isEmpty()) {
                try {
                    objectesQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                obj = objectesQueue.poll();
            }
        }

        // Do something with the Object obj
    }
}

What is the best way to stop this thread?

I see two options:

1 - Since Thread.stop() is deprecated, I can implement a stopThisThread() method that uses a n atomic check-condition variable.

2 - Send a Death Event object or something like that to the queue. When the thread fetches a death event, it exits.

I prefer the 1st way, however, I don't know when to call the stopThisThread() method, as something might be on it's way to the queue and the stop signal can arrive first (not desirable).

Any suggestions?

AplusKminus
  • 1,542
  • 1
  • 19
  • 32
halfwarp
  • 1,780
  • 5
  • 24
  • 41
  • 3
    Note, your `wait` should be in a `while` loop, not an `if`, to protect against spurious wakeups. – Tom Hawtin - tackline Apr 27 '10 at 11:54
  • To all those that depend on interrupt: Just wish to say that life is never that simple and there are times when one calls interrupt on a non interruptible "activity" and that activity takes absolutely no notice of the interrupt. This from experience. Despite that, I do not know of a better way... Please take a careful view of the API: http://java.sun.com/javase/6/docs/api/index.html?java/lang/Thread.html check out the interrupt() method – Yaneeve Apr 27 '10 at 12:13
  • @halfwarp: +1 and +1 to Stephen C's answer. Your *"death event"* is actually called a *"poison pill"*: you simply enqueue a special object (the poison pill) that means "stop the thread" once you dequeue it. That way anything that *"might be on it's way to the queue"* [sic] is still processed before the poison pill. – SyntaxT3rr0r Apr 27 '10 at 12:31
  • @Yaneeve: that's why you'd have to call `isInterrupted()` in all code that's supposed to handle interruptions. But that doesn't mean that it's wrong and/or impossible to use. In fact it's the only way to tell a Thread that's blocking in an I/O call to check for some condition. – Joachim Sauer Apr 27 '10 at 12:36
  • @Joachim: As I said, from experience, sometimes you just don't have that privilege. If, as in my case the code being interrupted is being run by a third party library, and that library does not "feel like" checking the interrupted status then things become a bit complicated. To be less vague, I had a problem to cancel out a CORBA request sent that had yet to timeout... – Yaneeve Apr 27 '10 at 12:56

6 Answers6

6

The DeathEvent (or as it is often call, "poison pill") approach works well if you need to complete all of the work on the queue before shutting down. The problem is that this could take a long time.

If you want to stop as soon as possible, I suggest you do this

BlockingQueue<O> queue = ...

...

public void run() {
   try {
       // The following test is necessary to get fast interrupts.  If
       // it is replaced with 'true', the queue will be drained before
       // the interrupt is noticed.  (Thanks Tim)
       while (!Thread.interrupted()) {
           O obj = queue.take();
           doSomething(obj);
       }
   } catch (InterruptedException ex) {
       // We are done.
   }
}

To stop the thread t that instantiated with that run method, simply call t.interrupt();.

If you compare the code above with other answers, you will notice how using a BlockingQueue and Thread.interrupt() simplifies the solution.

I would also claim that an extra stop flag is unnecessary, and in the big picture, potentially harmful. A well-behaved worker thread should respect an interrupt. An unexpected interrupt simply means that the worker is being run in a context that the original programmer did not anticipate. The best thing is if the worker to does what it is told to do ... i.e. it should stop ... whether or not this fits with the original programmer's conception.

Stephen C
  • 698,415
  • 94
  • 811
  • 1,216
  • @Stephen C: +1... In addition to that in JCIP it is recommended that if you get an unexpected interrupt you should restore the interrupted status using an *if(interrupted){ Thread.currenThread().interrupt() )}* construct in a *finally* block. Hmmmm so after all maybe that the better way is to both quit upon receiving an unexpected interrupt and restoring the interrupted status and then *also* allow for a "clean" stop using some *stopThisThread()* method or? – SyntaxT3rr0r Apr 27 '10 at 12:39
  • @WizardOfOdds - I don't follow you. 1) There is no point in the `run` method calling `interrupt()` to restore the flag. The thread that called it is about to die. 2) Nothing you just said explains why stop by interrupt is bad or why a "clean" stop mechanism is better. (And a JCIP recommendation is *not* an explanation.) – Stephen C Apr 27 '10 at 13:19
  • @Stephen C: JCIP page 142, there's your "explanation". Btw I didn't pretend to have any explanation and ended up my comment with a question mark '?' in case you didn't notice :) – SyntaxT3rr0r Apr 27 '10 at 13:46
  • @WizardOfOdds - not withstanding what JCIP might say, it clearly does not apply in this case. – Stephen C Apr 27 '10 at 15:27
  • 1
    @Stephen C: You should check the interrupt flag in the conditional of your while loop. According to the javadoc, a BlockingQueue implementation only has to throw an InterruptedException if it is waiting. If the queue is non-empty, it doesn't have to wait. Strictly adhering to the javadoc, your run method isn't guaranteed to exit quickly... or at all really. – Tim Bender Apr 28 '10 at 01:45
  • @Tim - while the javadoc doesn't explicitly state this, I think that a BlockingQueue implementation *has to* throw InterruptedException if the interrupted flag is already set. If it doesn't, there is an unfixable race condition where the flag gets set after the call to `Thread.isInterrupted()` and before the call to `take()`. – Stephen C Apr 28 '10 at 04:01
  • @Stephen C, I want to clarify. It is possible that so long as the queue is never empty, InterruptedException will never be thrown. If you look at the PriorityBlockingQueue and SynchronousQueue implementation you will see that both are allowed to return a value from take() before performing an interruptable blocking action. – Tim Bender Apr 28 '10 at 05:08
1

In your reader thread have a boolean variable stop. When you wish for this thread to stop set thius to true and interrupt the thread. Within the reader thread when safe (when you don't have an unprocessed object) check the status of the stop variable and return out of the loop if set. as per below.

public class readerThread extends Thread{
    private volitile boolean stop = false;
    public void stopSoon(){
        stop = true;
        this.interrupt();
    }
    public void run() {
        Object obj;
        while(true) {
            if(stop){
                return;
            }
            synchronized(objectsQueue) {
            if(objectesQueue.isEmpty()) {
                try {
                    objectesQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(stop){
                    return;
                }    
                obj = objectesQueue.poll();
                // Do something with the Object obj
            }
        }
    }


}
public class OtherClass{
     ThreadReader reader;
     private void start(){
          reader = ...;
          reader.start();
     }

     private void stop(){
          reader.stopSoon();
          reader.join();     // Wait for thread to stop if nessasery.
     }
}
nsayer
  • 16,925
  • 3
  • 33
  • 51
David Waters
  • 11,979
  • 7
  • 41
  • 76
  • This is how I've classically coded a queue worker. The significant difference between this approach and the accepted answer is that the "poison pill" method insures the queue is fully processed before exiting, rather than abandoned. Which of those approaches is more correct depends on the problem space. You might also consider adding a join() after the interrupt(), which makes stopSoon() block until the thread actually dies. – nsayer Jun 28 '16 at 15:49
1

Why not use a scheduler which you simply can stop when required? The standard scheduler supports repeated scheduling which also waits for the worker thread to finish before rescheduling a new run.

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleWithFixedDelay(myThread, 1, 10, TimeUnit.SECONDS);

this sample would run your thread with a delay of 10 sec, that means when one run finishes, it restarts it 10 seconds later. And instead of having to reinvent the wheel you get

service.shutdown()

the while(true) is not necessary anymore.

ScheduledExecutorService Javadoc

dube
  • 4,898
  • 2
  • 23
  • 41
  • 1
    `service.shutdown()` is similar to the poison-pill approach in the sense that: no more tasks will be accepted, executing tasks will continue running, and previously accepted tasks will be executed. In contrast, `service.shutdownNow()` will try to stop executing tasks, and previously accepted tasks won't be be executed. – user454322 Aug 08 '12 at 09:25
0

Approach 1 is the preferred one.

Simply set a volatile stop field to true and call interrupt() on the running thread. This will force any I/O methods that wait to return with an InterruptedException (and if your library is written correctly this will be handled gracefully).

Joachim Sauer
  • 302,674
  • 57
  • 556
  • 614
  • 2
    Could the down-voter please comment on what's wrong with this answer? – Joachim Sauer Apr 27 '10 at 12:34
  • 1
    I just down voted. My reasoning is that it is unnecessary to use a separate stop field. Concurrency is a difficult topic. While there is nothing technically wrong with your answer, it also doesn't fit a best practice and I think it is in the interest of the community to limit exposure of answers which work but don't fit a best practice. – Tim Bender Apr 28 '10 at 01:50
  • @TimBender I respectfully disagree. Using a stop() method on a queue worker object that simply sets a boolean field and interrupts the thread is by any community consensus a "best practice." – nsayer Jun 28 '16 at 15:56
  • @nsayer a lot of people cross the street wherever they please, that doesn't mean it is best practice to do so rather than use the cross walk. A lot of people use an extra boolean, that doesn't mean it is best practice to do so rather than use the interrupt state of the thread. – Tim Bender Jul 02 '16 at 02:57
0

I think your two cases actually exhibit the same potential behavior. For the second case consider Thread A adds the DeathEvent after which Thread B adds a FooEvent. When your job Thread receives the DeathEvent there is still a FooEvent behind it, which is the same scenario you are describing in Option 1, unless you try to clear the queue before returning, but then you are essentially keeping the thread alive, when what you are trying to do is stop it.

I agree with you that the first option is more desirable. A potential solution would depend on how your queue is populated. If it is a part of your work thread class you could have your stopThisThread() method set a flag that would return an appropriate value (or throw Exception) from the enqueuing call i.e.:

MyThread extends Thread{
  boolean running = true;

  public void run(){
    while(running){
      try{
        //process queue...
      }catch(InterruptedExcpetion e){
        ...
      }
    }
  }

  public void stopThisThread(){
    running = false;
    interrupt();
  }

  public boolean enqueue(Object o){
    if(!running){
       return false;
         OR
       throw new ThreadNotRunningException();
    }
    queue.add(o);
    return true;
  }
}

It would then be the responsibility of the object attempting to enqueue the Event to deal with it appropriately, but at the least it will know that the event is not in the queue, and will not be processed.

M. Jessup
  • 8,153
  • 1
  • 29
  • 29
  • You approach is wrong unless you define the running variable as volatile. It is also not recommended to extend Threads but rather implement Callable or Runnable , so you wont have the interrupt function in your scope – Roman Apr 27 '10 at 12:36
0

I usually put a flag in the class that has the Thread in it and in my Thread code I would do. (NOTE: Instead of while(true) I do while(flag))

Then create a method in the class to set the flag to false;

private volatile bool flag = true;

public void stopThread()
{
   flag = false;
}

    public void run() {
        Object obj;
        while(flag) {
            synchronized(objectsQueue) {
                if(objectesQueue.isEmpty()) {
                    try {
                        objectesQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    obj = objectesQueue.poll();
                }
            }

            // Do something with the Object obj
        }
    }
SyntaxT3rr0r
  • 27,745
  • 21
  • 87
  • 120
Romain Hippeau
  • 24,113
  • 5
  • 60
  • 79
  • That doesn't look thread safe to me. The `flag` variable is being read and written in different threads without proper synchronization. – Stephen C Apr 27 '10 at 11:56
  • 2
    @Romain Hippeau: I made your boolean flag *volatile*, otherwise, as Stephen C pointed out, your code ain't properly synchronized. – SyntaxT3rr0r Apr 27 '10 at 12:35