1

I am trying to implement a simple producer/consumer system in Java 11. Basically, I take two threads for each, plus a global queue, simply as follows:

  • A global priority queue.
  • The first thread, producer, runs a HTTP server, listens to incoming http messages, and upon receiving a message, pushes it as a job to the queue (queue.size increments)
  • The second thread, the consumer, continously peeks the queue. If there is a job (job ! = null), submits a HTTP request somewhere and upon successful receipt, polls it from the queue (queue.size() decrements).

The skeleton is as below:

Main Class:

public class Manager
{
    private Consumer consumer;
    private Producer producer;
    Queue queue;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Producer class:

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
            //HTTP server starts, listens, and adds to the queue upon receiving a Job
            server.start();
            Manager.queue.add(new Job());
    }
}

Consumer class:

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
    // Thread.sleep(1);

        while(true)
        {
            //get an object off the queue
            Job job= Manager.queue.peek();
            //do some stuff with the object
        }
    }
}

Producer and queue works- all good. But the problem is with the Consumer. The Consumer code above (with while(true) loop) doesn't peek the item. But when I add a Thread.sleep(x) before while(true) loop, even if x=1 ms, it works, and grabs the item successfully.

What is the problem? Theoretically, while(true) loop shouldn't be a problem! Why it can not see and peek the item?!

Tina J
  • 4,983
  • 13
  • 59
  • 125

2 Answers2

1

The cause of the problem: non-synchronized reading and writing from and to a queue.

What happens here is that both threads, running on different CPU-cores work with their own copy of the queue, thus the producer might be adding stuff and these changes probably even get propagated into RAM, but the consumer never checks for anything in RAM, since it has it's own cached copy of that queue, witch stays empty.

The Thread.sleep() thing works, because when waking up, the thread has to get all ist stuff from RAM, where it probably changed.

The correct way of doing it, is only accessing the Queue, when synchronized on it as follows:

In producer:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
}

and in Consumer:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        Job job=Manager.queue.pop();
    }
}

And as a final touch: the while (true)-thing is incredibly inefficient, you could do something using Object.wait() and Object.notify()

In producer:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
     Manager.queue.notify();
}

and in Consumer:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        while (Manager.queue.peek() == null) {
            Manager.queue.wait();
        }
        Job job=Manager.queue.pop();
    }
}
Poohl
  • 1,932
  • 1
  • 9
  • 22
  • How much overhead does `while(true)` really add? Is it super bad?! – Tina J Dec 14 '18 at 22:14
  • Yes, it pinns one cpu-core at 100%, causing your system to effectively lose a CPU-core and forces the cpu to run at full speed, causing unnecesarry power consumption and heat. If you don't want to deal with `Object.notify()` at least do `Thread.sleep(10)`, to let the cpu idle most he time. – Poohl Dec 14 '18 at 23:03
  • I have a problem here. The second `poll` never works. Only the first one works. I'm trying to print something inside your while loop, but `while (Manager.queue.peek() == null) { System.out.println("111111111111111");` doesn't print anything for second poll. How to fix it? – Tina J Dec 17 '18 at 21:39
  • I don't know what you mean by `poll`. If your wondering why the `while` loop never loops, thats because the `notify()` and `wait()` logic should prevent this, you could in theory use an `if` if you really wanted to, but this `while` loop catches any coding errors and spontaneous wakeups that could interfere. Don't use `if`. If by `poll` you mean `pop`: maybe the new `Job` was added before the consumer looked for a new one, so no waiting was ever done and the `while` loop never executed even once. – Poohl Dec 17 '18 at 21:59
  • Yes, looks like somewhere the logic is failing. Consumer is in loop, prints("1111"), waits. Same time, Producer adds, notifies. Consumer pops. Again producer adds, notifies. But consumer doesn't go to loop to print 1111 for the second time. Why?! – Tina J Dec 17 '18 at 22:03
  • Because the condition in the `while` tells it that there already is an element ready to be `pop`ed, so there is no need to wait (so no need to enter the loop). And note: nothing here is happening "at the same time", the main purpose of synchronized is to prevent this. What is actually happening in your example is `peek() -> wait() -> add() -> peek() -> pop() -> add() -> peek() -> pop()` – Poohl Dec 17 '18 at 22:12
  • My new question just asked: https://stackoverflow.com/questions/53823767/producer-consumer-code-with-wait-notify-doesnt-work-on-second-produce – Tina J Dec 17 '18 at 22:16
  • @Poohi please update your answer to keep that `while(true)` wrap around what you the `synchronized` block. I thought the `while(true)` loop must be removed. – Tina J Dec 17 '18 at 22:35
  • Ow, you mean _that_ `while(true)`, I am sorry, I thought you were talking about a dedicated `while(true)` that did `while (peek() == null) {}`. I'm so sorry for the hassle. – Poohl Dec 17 '18 at 22:42
1

PriorityQueue is not thread-safe whereas PriorityBlockingQueue is. As long as you aren't using any methods defined in the BlockingQueue interface, these two implementations are interchangeable. Simply changing PriorityQueue to PriorityBlockingQueue should fix your problem.

Jacob G.
  • 28,856
  • 5
  • 62
  • 116