This question is in conjunction with the question below that I have already posted
Preventing thread from duplicate processing in java
Problem Statement : How to control certain block of code shared among many child thread to be accessed only once.
Scenario :
Now, I have a scenario where multiple threads are sharing a buffer resource (a plain concurrent list) where it updates the list with some values and clears it at the end. But I want this operation to be performed with atomicity in place. Only after the first thread updates and clears the list then the second thread should start updating the list. If the operation is out of sync without the first thread job is finished then it may cause data issues.
Snippet of the code is below and I am trying to place a synchronized block inside a thread. Does this make sense? Will it work? The lines inside the synchronized block should be accessed by only one thread at a time and the block is itself is inside a newly spawned thread.
public void processControlMessage(final Message message) {
try {
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
if (rdpWorkflowControlMessage != null) {
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final RdpWorkflowControlMessageType controlMessageType = rdpWorkflowControlMessage.getControlMessage();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (message instanceof TextMessage) {
if (controlMessageType != null && controlMessageType == RdpWorkflowControlMessageType.REFRESH) {
if (!isWorkflowBeingProcessed(workflowName)) {
log.info("Processing workflow control message for the workflow " + workflowName);
addShutdownHook(workflowName);
List<String> matchingValues = new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
List<RDPWorkflowTask> allTasks = fetchNewWorkflowItems(workflowName);
//Will the below line be accessed and executed only by one thread??
updateAndClearTasksandWorkflowSet(allTasks);
removeWorkflowNameFromProcessingMap(workflowName);
} else {
log.info("RDA cache clean up is already in progress for the workflow " + workflowName);
}
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message " + message);
}
}
}).start();
}
} finally {
lock.unlock();
}
}
private boolean isWorkflowBeingProcessed(final String workflowName) {
if (controlMessageStateMap.get(workflowName) == null) {
synchronized (this) {
if (controlMessageStateMap.get(workflowName) == null) {
log.info("Adding an entry in to the processing map for the workflow " + workflowName);
controlMessageStateMap.put(workflowName, true);
return false;
}
}
}
return true;
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
removeWorkflowNameFromProcessingMap(workflowName);
}
});
log.info("Shut Down Hook attached for the thread processing the workflow " + workflowName);
}
private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
try {
TextMessage textMessage = (TextMessage) message;
rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowTask from message " + message);
}
return rdpWorkflowControlMessage;
}
private List<RDPWorkflowTask> fetchNewWorkflowItems(String workflowName) {
workflowRestServiceClient.initSSL();
List<RDPWorkflowTask> allTasks = workflowRestServiceClient.fetchWorkflowSpecificTasks(workflowName);
return allTasks;
}
private void updateAndClearTasksandWorkflowSet(List<RDPWorkflowTask> allTasks) {
synchronized (this) {
if (allTasks != null && allTasks.size() > 0) {
taskEventListener.addRDPWorkflowTasks(allTasks);
workflowEventListener.updateWorkflowStatus(allTasks);
taskEventListener.getRecordsForUpdate().clear();
workflowEventListener.getRecordsForUpdate().clear();
}
}
}
private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
if (workflowName != null
&& (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap.get(workflowName))) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the " + workflowName + " is released from the status map");
}
}