I have two HashMap
s that I would like to synchronize in a class.
The reason I'm maintaining two maps is that I dispatch a task to different servers, and I maintain the original task object in one HashMap<String, Task>
, and at the same time I manage the response status in another HashMap<String, HashMap<InetAddress, TaskResponse>
.
The keys of both HashMaps
are String
which is unique to each Task
object.
So my code goes like this:
HashMap<String, Map<InetAddress, TaskResponse>> taskResponseMap = new HashMap<>();
HashMap<String, Task> taskMap = new HashMap<>();
public void endpointHasRespondedCallback(TaskResponse taskResponse, InetAddress from) {
// Callback threads gets blocked here!
synchronized(taskResponseMap) {
synchronized (taskMap) {
Map<InetAddress, TaskResponse> taskResponses = taskResponseMap.get(taskResponse.taskUuid);
if (taskResponses == null || !taskResponses.containsKey(from)) {
// The response does not exists probably due to timeout
return;
}
taskResponses.put(from, taskResponse);
}
}
}
public void sendTaskToAllEndpoints(Task task) {
long taskStartedAt = System.currentTimeMillis();
HashMap<InetAddress, TaskResponse> taskResponses = new HashMap<>();
taskResponseMap.put(task.taskUuid, taskResponses);
taskMap.put(task.taskUuid, task);
for (InetAddress dest : getDestinationNodes()) {
sendTaskTo(dest, task);
messageResponses.put(dest, TaskResponse.emptyTaskResponse());
}
// Should wait for response to comeback till the timeout is over
while (System.currentTimeMillis() < taskStartedAt + timeoutInMillis) {
Thread.sleep(1000);
synchronized(taskResponseMap) {
synchronized (taskMap) {
if(isTaskOver(task.taskUuid)) {
Map<InetAddress, TaskResponse> responses = taskResponseMap.remove(task.taskUuid);
taskMap.remove(task.taskUuid);
task.taskIsDone(responses);
return;
}
}
}
}
// If the task is still sitting there, then it must have timed out!
synchronized(taskResponseMap) {
synchronized (taskMap) {
taskResponseMap.remove(task.taskUuid);
taskMap.remove(task.taskUuid);
}
}
}
// Do not synchronize purposefully since it is only being called in synchronized methods
public boolean isTaskOver(String taskUuid) {
Task task = taskMap.get(taskUuid);
if (task == null || !taskResponseMap.containsKey(task.taskUuid)) {
return true;
} else {
for (TaskResponse value : taskResponseMap.get(task.taskUuid).values()) {
if (value.status != TaskResponseStatus.SUCCESSFUL) {
return false;
}
}
}
return true;
}
So to conceptually explain what my code does, sendTaskToAllEndpoints()
method sends Task
objects to remote endpoints and waits until the timeout inside the while
loop.
Whenever a remote endpoint responds, it executes endpointHasRespondedCallback()
method, so that it marks itself as done in the TaskResponseMap
.
Back in the sendTaskToAllEndPoints()
method, I check if the task is done by using isTaskDone()
helper method, and if not, I just continue and invoke Thread.sleep(1000)
to wait for one second until I check it next time.
The problem I'm facing is that even if I see endpointHasRespondedCallback()
method being executed for all of the remote nodes that I dispatched to(I verified with log statements), it waits outside of synchronized
blocks, and only goes in there whenever the timeout occurs in sendTaskToAllEndpoints()
method, therefore my task always times out even if all of the nodes have responded properly.
This is something I did not expect, because even if I'm acquiring locks on both objects in the while
loop, I'm unlocking it before I go to sleep, and I supposed when the sendTaskToAllEndpoints()
thread goes to sleep, the other threads waiting for the lock should acquire the lock in endpointHasRespondedCallback()
method, and marking the Task
s as done as intended.
I could probably implement my program in better ways to get around this problem, but I was wondering what would be the logical explanation for this behavior.