0

So I have a blocking queue implementation. One of the Schedulars is putting a random number to the queue with a delay of 1 seconds, And I have implemented another Schedular with a pool of 10 threads to invoke take() from the message queue.

Important part is the scenario which I am trying to implement is that after taking a single item from the queue the thread waits for 20 seconds (Thread Sleep) and what my understanding was that the other 9 threads in the thread pool would start working parallel while the first thread waits for 20 seconds (Other threads also would wait for 20 seconds) but it's not the case. Other threads of the pool does not seem to start at all. I'm a newb for BlockingQueues and any help would be really appreciated.

My code is as follows.

public class BlockingQueueImpl {

public Queue<Integer> messageQueue = new ConcurrentLinkedDeque();

private void putNumber(Integer number){
   try{
       System.out.println("putting number to the queue: " + number);
       messageQueue.add(number);
       System.out.println("size of the queue: " +messageQueue.size());

   } catch (Exception e){
       e.printStackTrace();
   }
}

private void getNumber(){

}


private class RunnableGetImpl implements Runnable {

    @Override
    public void run() {
        try{
            Integer num = messageQueue.poll();
            System.out.println("Polling from queue, number - "+ num);
            if(num!=null){
                System.out.println("Sleeping thread for 20 sec"+Thread.activeCount());
                Thread.sleep(20000);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


private class RunnablePutImpl implements Runnable {

    @Override
    public void run() {
        Random rand = new Random();
        int n = rand.nextInt(100);
        n += 1;
        putNumber(n);


    }


}

public static void main(String[] args){

    BlockingQueueImpl blockingQueue = new BlockingQueueImpl();


    ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1);
    executor1.scheduleAtFixedRate(blockingQueue.new RunnablePutImpl(), 0, 1000, TimeUnit.MILLISECONDS);

    ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(20);
    executor2.scheduleAtFixedRate(blockingQueue.new RunnableGetImpl(), 0, 100, TimeUnit.MILLISECONDS);

}

}

1 Answers1

0

From JavaDoc of ScheduledThreadPoolExecutor.scheduleAtFixedRate:

If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

Therefore you will need to start (schedule) as many workers as you want/need.

In search of a better solution, please be aware that you are not actually using a BlockingQueue

You don't implement java.util.concurrent.Blockingqueue, nor are you using an implementation of it. ConcurrentLinkedDeque is just a collection, it doesn't even implement Queue.

ConcurrentLinkedDeque.poll() won't block and will simply return null if the queue is empty.

These are the JavaDocs for the BlockingQueue interface: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

Use put() to supply a value to the queue. The operation will block if the BlockingQueue has reached its maximum capacity. Use take() to remove an element. This will block if the queue is empty.

Using these classes properly will improve the performance of your application, since you won't be polling for a value all the time.

More detail is available in this answer to a similar question: How to use ConcurrentLinkedQueue?

Update: Example code with multiple producers/consumers

The following example code is reproduced from https://riptutorial.com/java/example/13011/multiple-producer-consumer-example-with-shared-global-queue with which I share no affiliation:

Below code showcases multiple Producer/Consumer program. Both Producer and Consumer threads share same global queue.

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Output:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

Note how multiple producers and consumers are added to the pool - you need as many as you would want to potentially work in parallel. This is the crucial thing your code is missing - multiple workers. The scheduler will schedule them, but it won't magically multiply the single instance that you have asked it to schedule.

Obviously you'll want to adapt the number of producers and consumers according to your requirements.

Thomas Timbul
  • 1,634
  • 6
  • 14
  • I see, so what would be the way to achieve what I am trying to do. I have a scenario specifically the scheduled task is a blocking action and will take some time to complete. Is there any other implementations that I can use to concurrently work on the queue with a threadpool without blocking all the other threads just because on of it is blocking – Harinda Samarasekara Nov 19 '20 at 13:08