I'm trying to figure out the best approach in Java in this case (any version) to implement a single producer multiple consumer where I use ExecutorService (by preference, not need) where the producer needs to run "forever" but each time it runs, it needs to wait for everything to be fully processed, e.g. all consumer threads have terminated, the queue is empty and no items are left to be produced. The producer should only poll its data source at a fixed interval as well.
So as an example: every 30 minutes I want my producer to poll its data source for records and submit them to the queue. If the consumers are taking more than 30 minutes to process, I want the producer to wait until all the items have been processed before polling its data source again (which it would do so immediately since 30 minutes had elapsed).
Not looking for someone to write my code for me. Some basic hints/guidance would be really appreciated.
Here is a shortened example implementation I'm trying to work from. I've taken out all of my horrible attempts at solving the problem. Note that the hard coded parameters for constructing the ThreadPoolExecutor will eventually be dynamic.
import java.util.concurrent.*;
public class ItemProcessorService {
public static void main(String args[]) throws InterruptedException {
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
int corePoolSize = 5,
maxPoolSize = 10,
keepAlive = 10,
queueCapacity = 1;
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);
for (int i = 0; i < 10; i++) {
executor.execute(new ItemConsumer());
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Executor finished");
}
}
class ItemConsumer implements Runnable {
@Override
public void run() {
processItem();
}
private void processItem() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " - processed item");
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " - rejected");
}
}