1

This is a follow-up question from my previous question asked here.

I am using a PriorityBlockingQueue now. I changed my producer to the following:

synchronized(Manager.queue) {
    Manager.queue.add(new Job());
    Manager.queue.notify();
}

And changed Consumer to the following. Full code skeleton is here:

//my consumer thread run()
public void run() {
synchronized(Manager.queue) {
    while (Manager.queue.peek() == null) {
                System.out.println("111111111111111");
                try {
                    Manager.queue.wait();
                } catch (InterruptedException e) {
                }
            }
    Job job=Manager.queue.peek();
if (job != null) {
                submitJob(job);
                if (job.SubmissionFailed.equals("false")) {
                    // successful submission. Remove from queue. Add to another.
                    Manager.queue.poll();
                    Manager.submissionQueue.put(job.uniqueid, job);
}
}
}

My code only works for the first time (first produce and first consume), but it doesn't work for the second time. Somewhere the wait/notify logic fails I guess. The producer pushes new jobs to the queue, but the consumer doesn't peek any more items. In fact, it doesn't even go to the while loop and no more 111111111111111 printing.

What is the problem? How to fix it?

Tina J
  • 4,983
  • 13
  • 59
  • 125
  • 1
    Is your consumer logic located in the `while(true)` loop (from the original question) or the like of your consumer thread? I feel like it isn't, which would explain why it only runs once. – Mark Dec 17 '18 at 22:19
  • No, I removed the original `while(true)`, and replaced it with `synchronized(Manager.queue) { while (Manager.queue.peek() == null) { Manager.queue.wait(); }`. Should I add this inside the `while(true)` without removing it?! – Tina J Dec 17 '18 at 22:24
  • 1
    So now that you are using a PriorityBlockingQueue, you no longer need to "synchronize" (lock) the queue to produce/consume in a safe manner - that's what the BlockingQueue gets you. You also do not need to "wait/notify" anymore, as similar (but much easier to use) functionality is part of the BlockingQueue's API. You should take a look at the documentation of that class - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/PriorityBlockingQueue.html - to consume, you just need to call "take()" - it will block until an item is avaliable. – dan.m was user2321368 Dec 17 '18 at 22:26
  • 1
    @dan.mwasuser2321368 the code as is isn't setup correctly for that. After peek returns null it jumps out of the entire while loop. Which is what happens after the first result has been dealt with and no new items are added, and no new items can be added by other threads in the meantime since it's in a synchronize block. – Mark Dec 17 '18 at 22:29
  • Oh, I just added all my consumer code in the original `while(true)` and now works. I thought the whole point is to remove that `while(true)` loop! – Tina J Dec 17 '18 at 22:29
  • @Mark - the code isn't setup for that b/c the OP was given some poor advice in the previous answer. If you are going to use a blocking data structure, you should take advantage of everything it offers. – dan.m was user2321368 Dec 17 '18 at 22:33
  • @dan.mwasuser2321368 True, I wrote my answer as response to your original comment, your comment is good now, I'll still leave mine since it contains info why the code does not work – Mark Dec 17 '18 at 22:43

1 Answers1

3

You could simplify all this code to just:

In the Producer:

Manager.queue.add(new Job());

and in the Consumer:

while (true) {
    try {
        submitJob(Manager.queue.take()); //or do something else with the Job
        //your code here, then remove the break
        break;
    } catch (InterruptedException ex) {
        //usually no need to do anything, simply live on unless you
        //caused that
    }
}
//or your code here, then you need an surrounding while and the break

When using an PriorityBlockingQueue, you don't need any syncronized statements, since they're inside the PriorityBlockingQueue already. And according to the documentation take() wait for an element to be added if necessary and than polls it. See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html#take() for reference.

And for the InterruptedException you might want to take a look here: Handling InterruptedException in Java

Edit: added missing try{} catch()

Poohl
  • 1,932
  • 1
  • 9
  • 22
  • `while(true)` must be kept in consumer. Right?! – Tina J Dec 17 '18 at 22:36
  • `take()` removes the item. First I shouldn't remove it. After I know it is successfull, I will remove. So I need to have the `peek()`, then inside my `if`, I `poll()/take()` it. – Tina J Dec 17 '18 at 22:39
  • `while(true)` needs t be kept, if you want to process multiple items, yes. And Secondly, unless there is another reason not shown here to keep the item in the Queue, don't `peek()` and `poll()` any more often than necessary, since each of these methods needs to synchronize, which takes really long, compared to a simple "look at variable" operation. Rather store the `Job` in a temporary variable. – Poohl Dec 17 '18 at 22:48
  • I need to process the jobs in the queue one by one. Only after I know the HTTP response is successfull (my job processing is a REST request), I need to remove it from queue. Right now I don't know why if I produce quickly, I get an expeption. Any reasons?! – Tina J Dec 17 '18 at 22:55
  • Is it an `InterruptedException`? If so this is more or less random, you can simply catch it and retry the `take()`. I'll update my answer – Poohl Dec 17 '18 at 22:57
  • No. It is related to my HTTP client library. Looks like the response json gets null if I push too fast :-/ – Tina J Dec 17 '18 at 23:00
  • well, you get the`try {} catch () {}` anyways. – Poohl Dec 17 '18 at 23:05
  • 1
    @TinaJ - Testing for ".peek()" inside a "while(true)" loop is (as explained in the other question) is a very expensive operation, as it essentially dedicates one of your cores to continuously check if an item has arrived or not. Do you really need a priority queue (i.e. is there any reason you cannot processes these requests FIFO)? If you don't need a priority queue, a better solution would be to use a double-ended queue, like a LinkedBlockingDeque, take the item of the queue, attempt the transaction, and then replace it at the "head" with ".addFirst()" if the transaction fails. – dan.m was user2321368 Dec 17 '18 at 23:08
  • Yes, I need to compare the items based on `job.time`, and process the job which has the earliest time. Looks like currently my code has some Java semantics errors when I produce frequently. Apparently when I call `submitJob(job)` in my code above, the new `job` object doesn't have the modified field somehow! Maybe I need to return that object. – Tina J Dec 17 '18 at 23:11
  • If something doesn't get modified even though there is a statement that clearly does that, it's usually some kind of missing synchronization. – Poohl Dec 17 '18 at 23:14
  • No worries. It was the server's problem (doesn't allow same job to be submitted twice frequently!). BTW, what is that `break;` in your code?! If I break out of the while then I only consume one object! – Tina J Dec 17 '18 at 23:47
  • That's why you should remove it if your code is there. As stated, if you want you code to be after the while, enclosed by another while handling the consumption of multiple threads you need the `break`. This is just an option I included, since some folks consider it better style to have as little indentations as possible. – Poohl Dec 17 '18 at 23:53
  • @Poohl I have similar problems with `HashTable`. I thought it is already thread-safe and doesn't need `synchronized` keyword. But I get anomalies. – Tina J Dec 26 '18 at 17:35
  • 1
    @TinaJ `HashTable` is not thread safe, as are most java standard implementations, unless explicitly stated otherwise. See my answer to your previous question, the same principle can be applied to make anything thread-safe, thought it might not always be the best solution (I don't know a better one here). – Poohl Dec 26 '18 at 18:28
  • Thanks. How about `ConcurrentHashMap`? – Tina J Dec 26 '18 at 18:40
  • 1
    @tinaj well, that sounds like a concurrent version of a `HashMap`, if this exists then use it. I don't know a thread-safe version for 99% of all standard implementations, i can tell you how to make something thread-safe and i can point you to google... – Poohl Dec 26 '18 at 18:42