72

I have a single thread producer which creates some task objects which are then added into an ArrayBlockingQueue (which is of fixed size).

I also start a multi-threaded consumer. This is build as a fixed thread pool (Executors.newFixedThreadPool(threadCount);). I then submit some ConsumerWorker intances to this threadPool, each ConsumerWorker having a refference to the above mentioned ArrayBlockingQueue instance.

Each such Worker will do a take() on the queue and deal with the task.

My issue is, what's the best way to have a Worker know when there won't be any more work to be done. In other words, how do I tell the Workers that the producer has finished adding to the queue, and from this point on, each worker should stop when he sees that the Queue is empty.

What I've got now is a setup where my Producer is initialized with a callback which is triggered when he finishes it's job (of adding stuff to the queue). I also keep a list of all the ConsumerWorkers I've created and submitted to the ThreadPool. When the Producer Callback tells me that the producer is done, I can tell this to each of the workers. At this point they should simply keep checking if the queue is not empty, and when it becomes empty they should stop, thus allowing me to gracefully shutDown the ExecutorService thread pool. It's something like this

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

The problem here is that I have an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue.

My question is what's the best way to synchronize this so that it all works ok? Should I synchronize the whole part where it checks if the producer is running plus if the queue is empty plus take something from the queue in one block (on the queue object)? Should I just synchronize the update of the isRunning boolean on the ConsumerWorker instance? Any other suggestion?

UPDATE, HERE'S THE WORKING IMPLEMENTATION THAT I'VE ENDED UP USING:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

This was inspired heavily by JohnVint's answer below, with only some minor modifications.

=== Update due to @vendhan's comment.

Thank you for your obeservation. You are right, the first snippet of code in this question has (amongst other issues) the one where the while(isRunning || !inputQueue.isEmpty()) doesn't really make sense.

In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).

Shivan Dragon
  • 15,004
  • 9
  • 62
  • 103
  • 1
    An executorService already has a queue, so you don't need another one. You can start the whole executor service with a shutdown(). – Peter Lawrey Jan 23 '12 at 16:30
  • @PeterLawrey I'm sorry but I don't understand your comment... – Shivan Dragon Jan 23 '12 at 16:55
  • 8
    As an ExecutorService has a queue already, you can just add tasks to it and you won't need an additional queue, nor will you have to work out how to stop them as this is already implemented. – Peter Lawrey Jan 23 '12 at 18:50
  • Right you are, but I wanted to avoid having to deal with all those Callable and Runnable objects, I just wanted a queue containing my actual business data. I will however check if doing so doesn't result in a faster implementation then if I were to to them with the blocking queue thing. – Shivan Dragon Jan 23 '12 at 19:01
  • @PeterLawrey In your comment _You can **start** the whole executor service with a shutdown()_ did you mean _stop_? – user454322 Aug 04 '12 at 16:09
  • @user454322 You are right, shutdown stops the ExecutorService – Peter Lawrey Aug 04 '12 at 16:14
  • @ShivanDragon : What is the 'sleep' in your solution for ? – 2020 Apr 22 '13 at 17:38
  • 1
    @ShivanDragon: I raised a linked [question](http://stackoverflow.com/questions/16134608/is-there-really-a-race-condition-in-this-multi-threaded-java-code), where people implied that your OP should have the condition && and not ||. If you feel your OP should be edited to change the || to an &&, please do so. – 2020 Apr 22 '13 at 17:41
  • @Shivan Dragon, Question: Why you are not putting back the POISON to the queue? if there are multiple consumers, others will never get this poison and never stop – yuris Jan 14 '14 at 11:08
  • @yuris: yes you're right. There's a part of this code that's missing: Since in this case I already know how many consumers I have, when I want them all to stop (when the producer doesn't have anything more to produce) the producer will add a number of poison pill elements to the queue equal to the number of consumer-workers. This way each one will take it's own individual poison pill. There are other ways to do this (like peek first, and only take if element is NOT poison pill. If it is, you (the consumer-worker) don't take it, but simply stop/die. – Shivan Dragon Jan 14 '14 at 13:30
  • @Shivan Dragon, it's clear now, thanks. For the peek approach, I think it is problematic when the queue is empty... peek() on empty queue = null, so it will go to take() and wait. And when producer thread will put POISON, the take() will consume it... – yuris Feb 04 '14 at 15:49

6 Answers6

87

You should continue to take() from the queue. You can use a poison pill to tell the worker to stop. For example:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
John Vint
  • 39,695
  • 7
  • 78
  • 108
  • 45
    +1 for my tech dictionary just got one term richer with "poison pill" – Kiril Jan 23 '12 at 16:22
  • 2
    I've tried this and it works ok, except for one litle modif: I must do inputQueue.put(POISON_PILL); not offer, because if I do offer() and the queue is at full capacity at that moment (i.e. the Workers were really lazy) it will not add the POISON PILL element to it. Is this correct please, or am I saying stupid things? – Shivan Dragon Jan 23 '12 at 16:24
  • @AndreiBodnarescu Youre right it isnt offer. I thought I got all of them but only fixed one :) – John Vint Jan 23 '12 at 16:32
  • @AndreiBodnarescu Also, if you have tried this and are asking a question, I am assuming this didn't suffice? – John Vint Jan 23 '12 at 16:33
  • @JohnVint: well you've just done more modifs to your answer then I think are needed. Since each worker will adda poison pill via the finish() call, you don't need to re-add the poison pill in the main loop once you stumble upon it, it's ok to consume it, the other workers will still have each it's poison pill. Also the method I'm using here is "put" , not "add" nor "offer". – Shivan Dragon Jan 23 '12 at 16:41
  • @JohnVint: allrighty, I'm gonna go with your answer because it's the best in here, but I will however update my question to specify the actual implementation I've chosen to use. Thanks a lot for the help! – Shivan Dragon Jan 23 '12 at 16:46
  • Do you think that current design might burn and waste a lot of CPU? For example if queue will be empty awaiting for elements all threads become busy as there are no break for IO? – Wild Goat Nov 16 '15 at 19:25
  • why not `peek` for the poison pill instead of taking it then adding it back for other consumers? What if another consumer does a take just after you take out the poison pill? The queue will be empty (I guess the other consumer will then wait and get the poison pill eventually, but still...) – Rhubarb Jun 17 '16 at 08:44
  • @Rhubarb It's a very good point. I looked into the code, at the least, and the only side effect is acquiring the take lock twice instead of once per `take`. It's a reasonable approach though. – John Vint Jun 17 '16 at 12:39
  • And yes, if the other consumer does a take and it's empty then they will wait until the other thread notifies with the same object. – John Vint Jun 17 '16 at 12:42
  • Why that funny name - where most people would call that message "Done" or "Shutdown" or "Close" - and only few would come up with "poisoned pill" ? – BitTickler Jan 07 '19 at 15:06
14

I'd send the workers a special work packet to signal that they should shut down:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

To stop the workers, simply add ConsumerWorker.DONE to the queue.

NPE
  • 486,780
  • 108
  • 951
  • 1,012
1

Can not we do it using a CountDownLatch, where the size is the number of records in the producer. And every consumer will countDown after process a record. And its crosses the awaits() method when all tasks finished. Then stop all ur consumers. As all records are processed.

Arpan Das
  • 1,015
  • 3
  • 24
  • 57
1

In your code-block where you attempt to retrive element from the queue , use poll(time,unit) instead of the take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

By specifying appropriate values of timeout , you ensure that threads wont keep blocking in case there is a unfortunate sequence of

  1. isRunning is true
  2. Queue becomes empty , so threads enter blocked wait ( if using take()
  3. isRunning is set to false
Bhaskar
  • 7,443
  • 5
  • 39
  • 51
  • Yes that's one of the alternatives I've tried, but it has some disatvantages: first i do alot of unnecessary calls to poll(), then when doing this (!isRunning && queue.isEmpty()) plus taking stuff from the queue I have to synchronize them all with a synchronize block and that's redundant since the BlockingQueue already takes care of all that by itself. – Shivan Dragon Jan 23 '12 at 16:54
  • For avoiding the first point , why dont you just arrange the thread which sets the `isRunning` to false to send an interrupt on the thread waiting on the `take()` call ? The catch block still works the same way - this does not need separate synchronization - unless you plan to set the `isRunning` back to true from false.. – Bhaskar Jan 23 '12 at 17:13
  • I've also tried that too, sending interrupt to your threads. Problem is, if your threads are started by an ExecutorService (Like a FixedThreadPool), when you do executorService.shutDown() all your threads will receive InterruptedException which will make them stop mid way in the task (since they're now rigged to treat InterruptedException as a stopper). Besides, comunicating via thrown exceptions like that is not very efficent. – Shivan Dragon Jan 24 '12 at 08:32
0

I had to use a multi-threaded producer and a multi-threaded consumer. I ended up with a Scheduler -- N Producers -- M Consumers scheme, each two communicate via a queue (two queues total). The Scheduler fills the first queue with requests to produce data, and then fills it with N "poison pills". There is a counter of active producers (atomic int), and the last producer that receives the last poison pill sends M poison pills to the consumer queue.

18446744073709551615
  • 16,368
  • 4
  • 94
  • 127
0

There are a number of strategies you could use, but one simple one is to have a subclass of task that signals the end of the job. The producer doesn't send this signal directly. Instead, it enqueues an instance of this task subclass. When one of your consumers pulls off this task and executes it, that causes the signal to be sent.

Kaelin Colclasure
  • 3,925
  • 1
  • 26
  • 36