0

I've been all over the internet and the Java docs regarding this one; I can't seem to figure out what it is about do while loops I'm not understanding. Here's the background: I have some message handler code that takes some JSON formatted data from a REST endpoint, parses it into a runnable task, then adds this task to a linked blocking queue for processing by the worker thread. Meanwhile, on the worker thread, I have this do while loop to process the message tasks:

do {
    PublicTask currentTask = pubMsgQ.poll();
    currentTask.run();
} while(pubMsgQ.size() > 0);

pubMsgQ is a LinkedBlockingQueue<PublicTask> (PublicTask implements the Runnable interface). I can't see any problems with this loop (obviously, or else I wouldn't be here), but this is how it behaves during execution: Upon entering the do block, pubMsgQ is polled and returns the runnable task as expected. The task is then run successfully with expected results, but then we get to the while statement. Now, according to the Java docs, poll() should return and remove the head of the queue, so I should expect that pubMsgQ.size() will return 0, right? Wrong I guess, because somehow the while statement passes and the program enters the do block again; of course this time pubMsgQ.poll() returns null (as I would have expected it should) and the program crashes with NullPointerException. What? Please explain like I'm five...

EDIT: I decided to leave my original post as is above; because I think I actually explain the undesired behavior of that specific piece of the code quite succinctly (the loop is being executed twice while I'm fairly certain there is no way the loop should be executing twice). However, I realize that probably doesn't give enough context for that loop's existence and purpose in the first place, so here is the complete breakdown for what I am actually trying to accomplish with this code as I am sure there is a better way to implement this altogether anyways.

What this loop is actually a part of is a message handler class which implements the MessageHandler interface belonging to my Client Endpoint class [correction from my previous post; I had said the messages coming in were JSON formatted strings from a REST endpoint. This is technically not true: they are JSON formatted strings being received through a web socket connection. Note that while I am using the Spring framework, this is not a STOMP client; I am only using the built-in javax WebSocketContainer as this is more lightweight and easier for me to implement]. When a new message comes in onMessage() is called, which passes the JSON string to the MessageHandler; so here is the code for the entire MessageHandler class:

public class MessageHandler implements com.innotech.gofish.AutoBrokerClient.MessageHandler {
    private LinkedBlockingQueue<PublicTask> pubMsgQ = new LinkedBlockingQueue<PublicTask>();
    private LinkedBlockingQueue<AuthenticatedTask> authMsgQ = new LinkedBlockingQueue<AuthenticatedTask>();
    private MessageLooper workerThread;
    private CyclicBarrier latch = new CyclicBarrier(2);
    private boolean running = false;
    private final boolean authenticated;
    
    public MessageHandler(boolean authenticated) {
        this.authenticated = authenticated;
    }

    @Override
    public void handleMessage(String msg) {
        try {
            //Create new Task and submit it to the message queue:
            if(authenticated) {
                AuthenticatedTask msgTsk = new AuthenticatedTask(msg);
                authMsgQ.put(msgTsk);
            } else {
                PublicTask msgTsk = new PublicTask(msg);
                pubMsgQ.put(msgTsk);
            }
            //Check status of worker thread:
            if(!running) {
                workerThread = new MessageLooper();
                running = true;
                workerThread.start();
            } else if(running && !workerThread.active) {
                latch.await();
                latch.reset();
            }
        } catch(InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
    
    private class MessageLooper extends Thread {
        boolean active = false;
        
        public MessageLooper() {
            
        }
        
        @Override
        public synchronized void run() {
            while(running) {
                active = true;
                if(authenticated) {
                    do {
                        AuthenticatedTask currentTask = authMsgQ.poll();
                        currentTask.run();
                        if(GoFishApplication.halt) {
                            GoFishApplication.reset();
                        }
                    } while(authMsgQ.size() > 0);
                } else {
                    do {
                        PublicTask currentTask = pubMsgQ.poll();
                        currentTask.run();
                    } while(pubMsgQ.size() > 0);
                }
                try {
                    active = false;
                    latch.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

You may probably see where I'm going with this...what this Gerry-rigged code is trying to do is act as a facsimile for the Looper class provided by the Android Development Kit. The actual desired behavior is as messages are received, the handleMessage() method adds the messages to the queue for processing and the messages are processed on the worker thread separately as long as there are messages to process. If there are no more messages to process, the worker thread waits until it is notified by the handler that more messages have been received; at which point it resumes processing those messages until the queue is once again empty. Rinse and repeat until the user stops the program.

Of course, the closest thing the JDK provides to this is the ThreadPoolExecutor (which I know is probably the actual proper way to implement this); but for the life of me I couldn't figure out how to for this exact case. Finally, as a quick aside so I can be sure to explain everything fully, The reason why there are two queues (and a public and authenticated handler) is because there are two web socket connections. One is an authenticated channel for sending/receiving private messages; the other is un-authenticated and used only to send/receive public messages. There should be no interference, however, given that the authenticated status is final and set at construction; and each Client Endpoint is passed it's own Handler which is instantiated at the time of server connection.

  • Welcome to Stack Overflow. Please take the [tour] to learn how Stack Overflow works and read [ask] on how to improve the quality of your question. Then [edit] your question to include your source code as a working [mcve], which can be compiled and tested by others. – Progman Jan 01 '22 at 16:20
  • If your queue being accessed by multiple threads then the queue's contents may change between when you check its size and when you poll it. – khelwood Jan 01 '22 at 16:26
  • Are you certain that you're not just calling the do/while twice? It will fail as described if the queue is empty to start with. (that is, you should use `while` not `do/while`. – tgdavies Jan 01 '22 at 22:34

1 Answers1

0

You appear to have a number of concurrency / threading bugs in your code.

Assumptions:

It looks like there could be multiple MessageHandler objects, each with its own pair of queues and (supposedly) at most one MessageLooper thread. It also looks as if a given MessageHandler could be used by multiple request worker threads.

If that is the case, then one problem is that MessageHandler is not thread-safe. Specifically, the handleMessage is accessing and updating fields of the MessageHandler instance without doing any synchronization.

Some of the fields are initialized during object creation and then never changed. They are probably OK. (But you should declare them as final to be sure!) But some of the variables are supposed to change during operation, and they must be handled correctly.

One section that rings particular alarm bells is this:

    if (!running) {
        workerThread = new MessageLooper();
        running = true;
        workerThread.start();
    } else if (running && !workerThread.active) {
        latch.await();
        latch.reset();
    }

Since this is not synchronized, and the variables are not volatile:

  • There are race conditions if two threads call this code simultaneously; e.g. between testing running and assigning true to it.
  • If one thread sets running to true, there are no guarantees that a second thread will see the new value.

The net result is that you could potentially get two or more MessageLooper threads for a given set of queues. That breaks your assumptions in the MessageLooper code.


Looking at the MessageLooper code, I see that you have declared the run method as synchronized. Unfortunately, that doesn't help. The problem is that the run method will be synchronizing on this ... which is the specific instance of MessageLooper. And it will acquire the lock once and release it once. On short, the synchronized is wrong.

(For Java synchronized methods and synchronized blocks to work properly, 1) the threads involved need to synchronize on the same object (i.e. the same primitive lock), and 2) all read and write operations on the state guarded by the lock need to be done while holding the lock. This applies to use of Lock objects as well.)

So ...

  • There is no synchronization between a MessageLooper thread and any other threads that are adding to or removing from the queues.
  • There are no guarantees that the MessageLooper thread will notice changes to the running flag.
  • As I previously noted, you could have two or more MessageLooper polling the same pair of queues.

In short, there are lots of possible explanations for strange behavior in the code in the Question. This includes the specific problem you noticed with the queue size.

Writing correct multi-threaded code is difficult. This is why you should be using an ExecutorService rather than attempting to roll your own code.

But it you do need to roll your own concurrency code, I recommend buying and reading "Java: Concurrency in Practice" by Brian Goetz et al. It is still the only good textbook on this topic ...

Stephen C
  • 698,415
  • 94
  • 811
  • 1,216