1

I wrote a producer/consumer based program using Java's BlockingQueue. I'm trying to find a way to stop the consumer if all producers are done. There are multiple producers, but only one consumer.

I found several solutions for the "one producer, many consumers" scenario, e.g. using a "done paket / poison pill" (see this discussion), but my scenario is just the opposite.

Are there any best practice solutions?

Community
  • 1
  • 1
alapeno
  • 2,724
  • 6
  • 36
  • 53

3 Answers3

2

The best-practice system is to use a count-down latch. Whether this works for you is more interesting.....

Perhaps each producer should register and deregister with the consumer, and when all producers are deregistered (and the queue is empty) then the consumer can terminate too.

rolfl
  • 17,539
  • 7
  • 42
  • 76
0

Presumably your producers are working in different threads in the same VM, and that they exit when done. I would make another thread that calls join() on all the producers in a loop, and when it exist that loop (because all the producer threads have ended) it then notifies the consumer that it's time to exit. This has to run in another thread because the join() calls will block. Incidentally, rolfl's suggestion of using a count down latch would have the problem, if I understand it correctly.

Alternately, if the producers are Callables, then the consumer can call isDone() and isCanceled() on their Futures in the loop, which won't bock, so it can be used right in the consumer thread.

dj_segfault
  • 11,957
  • 4
  • 29
  • 37
0

You could use something like the following, i use registerProducer() and unregisterProducer() for keeping track of the producers. Another possible solution could make use of WeakReferences.

It's worth to mention that this solution will not consume the events that have already been queued when the consumer is shut down, so some events may be lost when shutting down.

You would have to drain the queue if the consumer gets interrupt and then process them.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class TestConsumerShutdown {

  private static interface SomeEvent {
    String getName();
  }

  private static class Consumer implements Runnable {

    private final BlockingQueue<SomeEvent> queue = new ArrayBlockingQueue<>(10);
    private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
    private final AtomicBoolean isRunning = new AtomicBoolean();
    private final AtomicInteger numberProducers = new AtomicInteger(0);


    public void startConsumer() {
      consumerExecutor.execute(this);
    }

    public void stopConsumer() {
      consumerExecutor.shutdownNow();
      try {
        consumerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }

    public void registerProducer() {
      numberProducers.incrementAndGet();
    }

    public void unregisterProducer() {
      if (numberProducers.decrementAndGet() < 1) {
        stopConsumer();
      }
    }

    public void produceEvent(SomeEvent event) throws InterruptedException {
      queue.put(event);
    }

    @Override
    public void run() {
      if (isRunning.compareAndSet(false, true)) {
        try {
          while (!Thread.currentThread().isInterrupted()) {
            SomeEvent event = queue.take();
            System.out.println(event.getName());
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          System.out.println("Consumer stopped.");
          isRunning.set(false);
        }
      }
    }
  }

  public static void main(String[] args) {
    final Consumer consumer = new Consumer();
    consumer.startConsumer();

    final Runnable producerRunnable = new Runnable() {

      @Override
      public void run() {
        final String name = Thread.currentThread().getName();   
        consumer.registerProducer();
        try {
          for (int i = 0; i < 10; i++) {
            consumer.produceEvent(new SomeEvent() {



              @Override
              public String getName() {
                return name;
              }
            });
          }
          System.out.println("Produver " + name + " stopped.");
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          consumer.unregisterProducer();
        }

      }
    };
    List<Thread> producers = new ArrayList<>();
    producers.add(new Thread(producerRunnable, "producer-1"));
    producers.add(new Thread(producerRunnable, "producer-2"));
    producers.add(new Thread(producerRunnable, "producer-3"));

    for (Thread t : producers) {
      t.start();
    }

  }
}
Ortwin Angermeier
  • 5,957
  • 2
  • 34
  • 34