Say I have a large queue, something like 10,000 objects. I want to make a threadpool with 5 worker threads, each one removing an item from the queue and working on it, until the queue is empty.
My worry is that using the set up I've seen in various places, I end up creating 10,000 jobs immediately, but execute them via the 5 workers. I feel like this isn't really scalable - the queue already has 10,000 items, and now I have an additional 10,000 jobs on the stack (even though they're not actively being executed, this seems like a memory problem).
This seems to be what this answer is suggesting: https://stackoverflow.com/a/9916299/774359 - it's the "// now submit our jobs
" part that worries me. Is it a problem that I'm effectively dumping the queue into jobs?
Here's a brief example of what I have so far:
In Main():
ExecutorService executor = Executors.newFixedThreadPool(5);
while(!hugeQueue.isEmpty()) {
String work = hugeQueue.remove();
System.out.println("Creating job for " + work);
Runnable worker = new Worker(work);
executor.execute(worker);
}
In the Worker class:
public Worker(String itemFromQueue) { this.job = itemFromQueue; }
@Override
public void run() {
System.out.println("Working on " + this.itemFromQueue);
//Do actual work
}
When hugeQueue
contains 10,000 numbers, I see all 10,000 "Creating job" messages, followed by all 10,000 "Working on" messages. I think it would be better if only 5 jobs were created at once, and then they're worked on - as a thread opens up, it creates another job and then works. That way, there's never 10,000 jobs on the stack. How would I accomplish that? Am I thinking about this architecture correctly?
Edited to include updated information based on an answer:
@seneque's code didn't compile straightaway, so I made some minor changes - unfortunately, the output of this is just the creation of the workers, and none of the actual work.
In Main():
int numOfThreads = 5;
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>();
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); }
ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
LongRunningWorker longRunningWorker = new LongRunningWorker();
for( int i = 0; i < numOfThreads ; i++ ) {
System.out.println("Created worker #" + i);
executor.submit(longRunningWorker);
}
System.out.println("Done");
In LongRunningWorker:
public class LongRunningWorker implements Runnable {
BlockingQueue<Integer> workQueue;
void spiderExmaple(BlockingQueue<Integer> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
try {
while(workQueue.poll(3, TimeUnit.SECONDS) != null) {
Integer work = workQueue.remove();
System.out.println("Working on " + work);
new Worker(work).run();
}
} catch (InterruptedException e) { e.printStackTrace(); }
}
}
In Worker:
public class Worker implements Runnable{
Integer work;
Worker(Integer x) { this.work = x; }
@Override
public void run() {
System.out.println("Finished work on " + this.work);
}
}