0

Say I have a large queue, something like 10,000 objects. I want to make a threadpool with 5 worker threads, each one removing an item from the queue and working on it, until the queue is empty.

My worry is that using the set up I've seen in various places, I end up creating 10,000 jobs immediately, but execute them via the 5 workers. I feel like this isn't really scalable - the queue already has 10,000 items, and now I have an additional 10,000 jobs on the stack (even though they're not actively being executed, this seems like a memory problem).

This seems to be what this answer is suggesting: https://stackoverflow.com/a/9916299/774359 - it's the "// now submit our jobs" part that worries me. Is it a problem that I'm effectively dumping the queue into jobs?

Here's a brief example of what I have so far:

In Main():

ExecutorService executor = Executors.newFixedThreadPool(5);
while(!hugeQueue.isEmpty()) {
    String work = hugeQueue.remove();
    System.out.println("Creating job for " + work);
    Runnable worker = new Worker(work);
    executor.execute(worker);
}

In the Worker class:

public Worker(String itemFromQueue) { this.job = itemFromQueue; }

@Override
public void run() {
     System.out.println("Working on " + this.itemFromQueue);
     //Do actual work
}

When hugeQueue contains 10,000 numbers, I see all 10,000 "Creating job" messages, followed by all 10,000 "Working on" messages. I think it would be better if only 5 jobs were created at once, and then they're worked on - as a thread opens up, it creates another job and then works. That way, there's never 10,000 jobs on the stack. How would I accomplish that? Am I thinking about this architecture correctly?


Edited to include updated information based on an answer:

@seneque's code didn't compile straightaway, so I made some minor changes - unfortunately, the output of this is just the creation of the workers, and none of the actual work.

In Main():

int numOfThreads = 5;
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>();
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); }

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
LongRunningWorker longRunningWorker = new LongRunningWorker();

for( int i = 0; i < numOfThreads ; i++ ) {
    System.out.println("Created worker #" + i);
    executor.submit(longRunningWorker);
}
System.out.println("Done");

In LongRunningWorker:

public class LongRunningWorker implements Runnable {
    BlockingQueue<Integer> workQueue;
    void spiderExmaple(BlockingQueue<Integer> workQueue) {
        this.workQueue = workQueue;
    }

    @Override
    public void run() {
        try {
            while(workQueue.poll(3, TimeUnit.SECONDS) != null) {
                Integer work = workQueue.remove();
                System.out.println("Working on " + work);
                new Worker(work).run();
            }
        } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

In Worker:

public class Worker implements Runnable{
    Integer work;
    Worker(Integer x) { this.work = x; }

    @Override
    public void run() {
        System.out.println("Finished work on " + this.work);

    }
}
Community
  • 1
  • 1
Jake
  • 3,142
  • 4
  • 30
  • 48
  • A ThreadPoolExecutorService (which is what is created when you invoke Executors.newFixedThreadPool(5)) has an internal queue. So here, you are taking from one queue to put in an other queue which will be read by 5 threads – seneque Jan 06 '17 at 13:25
  • @seneque Right - which would mean that, for my own queue of 10,000 items, I would effectively be making a second queue of the same size, correct? The objects are different, but my question is whether this is a workable solution given the double memory requirement – Jake Jan 06 '17 at 13:27
  • instead of that, if hugeQueue is a blocking queue, you can have your 5 thread having a reference to your queue and polling from the queue. – seneque Jan 06 '17 at 13:31
  • @seneque Do you mind writing up an example of that for me? I'd gladly accept your answer - it sounds like what I'm referring to at the end of my question, but I'm not sure how to accomplish that. – Jake Jan 06 '17 at 13:32
  • otherwise when creating the threadpoolExecutorService, you can define a queue with limited size and a rejection policy like ThreadPoolExecutor.CallerRunsPolicy – seneque Jan 06 '17 at 13:33
  • @Jake, If you are using java 8, use https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newWorkStealingPool-- – Ravindra babu Jan 09 '17 at 08:09

1 Answers1

1

One solution will be to have you five thread directly polling the queue.

BlockingQueue<String> hugeQueue = ...
ExecutorService executor = Executors.newFixedThreadPool(5);
LongRunningWorker longRunningWorker = new LongRunningWorker(hugeQueue);
for( int i = 0; i < 5 ; i++ ) {
    executor.submit(longRunningWorker)
}

Then the LongRunningWorker is defined like:

class LongRunningWorker(BlockingQueue<String> workQueue) extends Runnable {
    final BlockingQueue<String> workQueue;
    LongRunningWorker(BlockingQueue<String> workQueue) {
        this.workQueue = workQueue;
    }   

    public void run() {
       while((String work =  workQueue.poll(3, TimeUnit.Second) != null) {
           try {
               new Worker(work).run();
           } catch (Exception e) {
               // 
           }
        }
    }
}
seneque
  • 279
  • 1
  • 5
  • Perfect, exactly what I was looking for. Thanks! – Jake Jan 06 '17 at 13:45
  • It looks like the code doesn't quite work, after checking it out - any advice? It only shows the fact that it's created the workers, but the work itself never begins. – Jake Jan 07 '17 at 21:16
  • new Worker(work).run(); <-- so it is supposed to do the work – seneque Jan 09 '17 at 12:01