0

I have two HashMaps that I would like to synchronize in a class. The reason I'm maintaining two maps is that I dispatch a task to different servers, and I maintain the original task object in one HashMap<String, Task>, and at the same time I manage the response status in another HashMap<String, HashMap<InetAddress, TaskResponse>. The keys of both HashMaps are String which is unique to each Task object.

So my code goes like this:

HashMap<String, Map<InetAddress, TaskResponse>> taskResponseMap = new HashMap<>();
HashMap<String, Task> taskMap = new HashMap<>();

public void endpointHasRespondedCallback(TaskResponse taskResponse, InetAddress from) {
    // Callback threads gets blocked here!
    synchronized(taskResponseMap) {
        synchronized (taskMap) {
            Map<InetAddress, TaskResponse> taskResponses = taskResponseMap.get(taskResponse.taskUuid);
            if (taskResponses == null || !taskResponses.containsKey(from)) {
                // The response does not exists probably due to timeout
                return;
            }
            taskResponses.put(from, taskResponse);
        }
    }
}

public void sendTaskToAllEndpoints(Task task) {
    long taskStartedAt = System.currentTimeMillis();
    HashMap<InetAddress, TaskResponse> taskResponses = new HashMap<>();
    taskResponseMap.put(task.taskUuid, taskResponses);
    taskMap.put(task.taskUuid, task);

    for (InetAddress dest : getDestinationNodes()) {
        sendTaskTo(dest, task);
        messageResponses.put(dest, TaskResponse.emptyTaskResponse());
    }

    // Should wait for response to comeback till the timeout is over
    while (System.currentTimeMillis() < taskStartedAt + timeoutInMillis) {
        Thread.sleep(1000);

        synchronized(taskResponseMap) {
            synchronized (taskMap) {
                if(isTaskOver(task.taskUuid)) {
                    Map<InetAddress, TaskResponse> responses = taskResponseMap.remove(task.taskUuid);
                    taskMap.remove(task.taskUuid);

                    task.taskIsDone(responses);
                    return;
                }
            }
        }
    }

    // If the task is still sitting there, then it must have timed out!
    synchronized(taskResponseMap) {
        synchronized (taskMap) {
            taskResponseMap.remove(task.taskUuid);
            taskMap.remove(task.taskUuid);
        }
    }
}

// Do not synchronize purposefully since it is only being called in synchronized methods
public boolean isTaskOver(String taskUuid) {
    Task task = taskMap.get(taskUuid);
    if (task == null || !taskResponseMap.containsKey(task.taskUuid)) {
        return true;
    } else {
        for (TaskResponse value : taskResponseMap.get(task.taskUuid).values()) {
            if (value.status != TaskResponseStatus.SUCCESSFUL) {
                return false;
            }
        }
    }
    return true;
}

So to conceptually explain what my code does, sendTaskToAllEndpoints() method sends Task objects to remote endpoints and waits until the timeout inside the while loop. Whenever a remote endpoint responds, it executes endpointHasRespondedCallback() method, so that it marks itself as done in the TaskResponseMap. Back in the sendTaskToAllEndPoints() method, I check if the task is done by using isTaskDone() helper method, and if not, I just continue and invoke Thread.sleep(1000) to wait for one second until I check it next time.

The problem I'm facing is that even if I see endpointHasRespondedCallback() method being executed for all of the remote nodes that I dispatched to(I verified with log statements), it waits outside of synchronized blocks, and only goes in there whenever the timeout occurs in sendTaskToAllEndpoints() method, therefore my task always times out even if all of the nodes have responded properly.

This is something I did not expect, because even if I'm acquiring locks on both objects in the while loop, I'm unlocking it before I go to sleep, and I supposed when the sendTaskToAllEndpoints() thread goes to sleep, the other threads waiting for the lock should acquire the lock in endpointHasRespondedCallback() method, and marking the Tasks as done as intended.

I could probably implement my program in better ways to get around this problem, but I was wondering what would be the logical explanation for this behavior.

YShin
  • 565
  • 4
  • 13
  • Your synchronization code looks fine, though you don't need synchronization on `taskMap` since only one thread would ever change it. (Besides, having synchronization logic that relies on acquiring multiple locks is an easy way to end up in a deadlock situation if you screw something up; your goal should be to have fewer, not more, lock objects while still remaining thread-safe.) Are you positive that no other code (that you didn't post) is acquiring a lock on either of your objects? – Tim Jul 24 '14 at 18:16
  • isTaskOver is public, but not thread safe! You should change to private. Btw. avoid active waits using loops + Thread.sleep. Better use conditional wait (wait + notifyAll). – isnot2bad Jul 24 '14 at 18:38
  • The only change I made to make it work is by removing the synchronized block inside the while loop. That should imply that only thing that blocks my other thread from running is the synchronized block inside the while loop. I know in C, the compiler may [change the order of things being executed](http://stackoverflow.com/questions/4437527/why-do-we-use-volatile-keyword-in-c) in order to optimize the program, affecting the functionality of multi-threaded program. Is there something analogous to that in Java as well? – YShin Jul 24 '14 at 19:45

1 Answers1

0

Ok, never mind what happened was, I was calling the same sendTaskToAllEndpoints() method inside task.taskIsDone(responses), effectively holding the lock inside the while loop until that newly invoked task is ended as well.

I caught this bug by creating an AtomicInteger object to increment every time I enter synchronized block, and decrement it when I go out of it(and enclosing them in try-finally blocks). Once the first task was over, I observed the AtomicInteger object did not go down to 0, and they were going up and down between 1 and 2.

It appeared that the first task ended successfully because there was nothing blocking callbacks from executing, but once the first task was done, and task.taskIsDone() was called, that method itself created another instance of Task and called sendTaskToAllEndpoints(), therefore blocking all callback invocations for the second task and so on, until they ended with timeouts.

Thank you all for your suggestions.

YShin
  • 565
  • 4
  • 13