0

I'm trying to figure out the best approach in Java in this case (any version) to implement a single producer multiple consumer where I use ExecutorService (by preference, not need) where the producer needs to run "forever" but each time it runs, it needs to wait for everything to be fully processed, e.g. all consumer threads have terminated, the queue is empty and no items are left to be produced. The producer should only poll its data source at a fixed interval as well.

So as an example: every 30 minutes I want my producer to poll its data source for records and submit them to the queue. If the consumers are taking more than 30 minutes to process, I want the producer to wait until all the items have been processed before polling its data source again (which it would do so immediately since 30 minutes had elapsed).

Not looking for someone to write my code for me. Some basic hints/guidance would be really appreciated.

Here is a shortened example implementation I'm trying to work from. I've taken out all of my horrible attempts at solving the problem. Note that the hard coded parameters for constructing the ThreadPoolExecutor will eventually be dynamic.

import java.util.concurrent.*;
public class ItemProcessorService {
    public static void main(String args[]) throws InterruptedException {
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        int corePoolSize = 5,
                maxPoolSize = 10,
                keepAlive = 10,
                queueCapacity = 1;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);

        for (int i = 0; i < 10; i++) {
            executor.execute(new ItemConsumer());
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Executor finished");
    }
}

class ItemConsumer implements Runnable {
    @Override
    public void run() {
        processItem();
    }

    private void processItem() {
        try {
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " - processed item");
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " - rejected");
    }
}
Jason
  • 3
  • 2
  • You should use `executor.awaitTermination(...)` instead of doing a busy wait in a loop. – isnot2bad Aug 22 '14 at 23:08
  • Thanks. That made me uncomfortable to write. I found it in an example and didn't quite explore executors features fully. I figured there was a decent await. – Jason Aug 25 '14 at 19:13

3 Answers3

1

Go for thread pool executor with future tasks. It will make your poducer wait before all the threads are completed. Below is the example:

 for (int job = 0; job < yourList.size(); job++) {
        Future<String[]> future = Service.getCompletionService().take();

        if (future.isDone()) {
        }
    }
Darshan Mehta
  • 30,102
  • 11
  • 68
  • 102
0

Use a self-scheduling producer (see also here) combined with a CountDownLatch. I know you do not need the code, but I think in this case it does not hurt to have some code to experiment with:

import java.util.*;
import java.util.concurrent.*;

public class WaitForConsumers {

public static void main(String[] args) {

    try {
        new WaitForConsumers().demo();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static final int NUMBER_OF_SLEEP_TASKS = 100;

public void demo() throws Exception {

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    Producer p = new Producer(scheduler, 1000L);
    p.start();
    try {
        Thread.sleep(3000);
        p.stop();
        System.out.println("Stopped.");
    } finally {
        scheduler.shutdownNow();
    }
}

public static List<Long> produce() {
    List<Long> items = new ArrayList<Long>();
    for (int i = 0; i < NUMBER_OF_SLEEP_TASKS; i++) items.add(i * 1L);
    return items;
}

public static void consume(List<Runnable> tasks, CountDownLatch consumersRunning) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
    for (Runnable r : tasks) executor.execute(r);
    try {
        consumersRunning.await();
    } finally {
        executor.shutdownNow();
    }
}

class Producer implements Runnable {

    ScheduledExecutorService scheduler;
    long frequencyMs;
    volatile boolean stop;
    ScheduledFuture<?> producerTask;

    Producer(ScheduledExecutorService scheduler, long frequencyMs) {
        this.scheduler = scheduler;
        this.frequencyMs = frequencyMs;
    }

    void start() {
        scheduler.execute(this);
    }

    void stop() {

        System.out.println("Stopping producer.");
        stop = true;
        if (producerTask != null) {
            producerTask.cancel(true);
        }
    }

    @Override public void run() {

        long startTime = System.currentTimeMillis();
        List<Long> items = produce();
        CountDownLatch consumersRunning = new CountDownLatch(items.size());
        List<Runnable> tasks = wrap(items, consumersRunning);
        try {
            System.out.println("Consuming " + tasks.size() + " tasks.");
            consume(tasks, consumersRunning);
            System.out.println("Consumed tasks.");
        } catch (Exception e) {
            e.printStackTrace();
            stop = true;
        } finally {
            if (stop) {
                System.out.println("Producer stopping.");
            } else {
                long waitTime = frequencyMs - (System.currentTimeMillis() - startTime);
                if (waitTime < 1L) {
                    scheduler.execute(this);
                } else {
                    System.out.println("Next run in " + waitTime + " ms.");
                    producerTask = scheduler.schedule(this, waitTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    List<Runnable> wrap(List<Long> items, final CountDownLatch consumersRunning) {

        List<Runnable> tasks = new ArrayList<Runnable>();
        for (Long l : items) {
            tasks.add(new SleepTask(l, consumersRunning));
        }
        return tasks;
    }
} // class Producer

class SleepTask implements Runnable {

    long sleepTime;
    CountDownLatch consumersRunning;

    public SleepTask(long sleepTime, CountDownLatch consumersRunning) {
        this.sleepTime = sleepTime;
        this.consumersRunning = consumersRunning;
    }

    @Override public void run() {

        try {
            Thread.sleep(sleepTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumersRunning.countDown();
        }
    }
} // class SleepTask
}
Community
  • 1
  • 1
vanOekel
  • 6,358
  • 1
  • 21
  • 56
0

You can solve this problem nicely by creating an ExecutorService for every run of the producer. The producer creates it, shuts it down, and also waits for its termination.

For scheduling the producer, use either ScheduledExecutorService.scheduleWithFixedDelay(...) or ScheduledExecutorService.scheduleAtFixedRate(...), depending on what you want:

  • scheduleWithFixedDelay(...) will keep the specified amount of time between two runs, so no matter how long a run lasts, the next will follow after the specified amount of time:

    ...|XXXXXX|.............|XXXXX|.............|XXXXXXXXX|.............|X <--- fix ---> <--- fix ---> <--- fix --->

  • scheduleAtFixedRate(...) tries to keep the scheduling rate, so if a producer needs longer, the time between two runs will be reduced, but two runs will never overlap:

    ...|XXXXXXXX|...|XXXXX|......|XXXXXXXXXXXXXXX||XXX|....|XX <--- fix ---><--- fix ---><--- fix ---><--- fix ---><-

Simple example of how the producer may look like:

public class Producer implements Runnable {
    public void run() {
        ExecutorService executor = ... // create new executor

        // queue items
        for (Object item : itemSource) {
            executor.schedule(new Consumer(item));
        }

        // shutdown executor
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.HOURS);
    }
}

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleWithFixedDelay(new Producer(), 30, 30, TimeUnit.MINUTES);
// or
scheduler.scheduleAtFixedRate(new Producer(), 30, 30, TimeUnit.MINUTES);

Of course you will have to do proper exception handling as every exception in a producer run that is not caught, will stop the scheduler from re-scheduling it.

Please note that creating an executor might be expensive, so this approach is only appropriate, if consuming the items is far more expensive than creating the executor (which, in your situation - seems to be the case).

isnot2bad
  • 24,105
  • 2
  • 29
  • 50
  • 1
    This seems the most elegant solution. I am currently trying to figure out why my thread pool (x) value (pool-x-thread-n) keeps increasing. Is a handle on the pool being kept even though I'm doing a shutdown? Should I edit my question to show the current state of my code? – Jason Aug 26 '14 at 20:39
  • The default thread factory uses an internal static counter `poolNumber` that is increased for every instance. As every executor has its own instance, every time you create an executor, the number will increase. Nothing abnormal. – isnot2bad Aug 26 '14 at 21:39
  • So Thread-Pool-99999-thread-1 and so on should not concern me? It's not representative of multiple pools still in existence, just a number of pools each time I've instantiated an executor? – Jason Aug 26 '14 at 22:39
  • Look at the source code oft Executors$DefaultThreadFactory! It is just a counter that gets incremented every time an executor is created. And in debugging mode you can check the number oft threads. – isnot2bad Aug 27 '14 at 06:59