0

This question is in conjunction with the question below that I have already posted

Preventing thread from duplicate processing in java

Problem Statement : How to control certain block of code shared among many child thread to be accessed only once.

Scenario :

Now, I have a scenario where multiple threads are sharing a buffer resource (a plain concurrent list) where it updates the list with some values and clears it at the end. But I want this operation to be performed with atomicity in place. Only after the first thread updates and clears the list then the second thread should start updating the list. If the operation is out of sync without the first thread job is finished then it may cause data issues.

Snippet of the code is below and I am trying to place a synchronized block inside a thread. Does this make sense? Will it work? The lines inside the synchronized block should be accessed by only one thread at a time and the block is itself is inside a newly spawned thread.

public void processControlMessage(final Message message) {
    try {
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
        if (rdpWorkflowControlMessage != null) {
            final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
            final RdpWorkflowControlMessageType controlMessageType = rdpWorkflowControlMessage.getControlMessage();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (message instanceof TextMessage) {
                            if (controlMessageType != null && controlMessageType == RdpWorkflowControlMessageType.REFRESH) {
                                if (!isWorkflowBeingProcessed(workflowName)) {
                                    log.info("Processing workflow control message for the workflow " + workflowName);
                                    addShutdownHook(workflowName);
                                    List<String> matchingValues = new ArrayList<String>();
                                    matchingValues.add(workflowName);
                                    ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
                                    ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
                                    tasksSetDAO.deleteMatchingRecords(matchingValues);
                                    workflowSetDAO.deleteMatchingRecords(matchingValues);
                                    List<RDPWorkflowTask> allTasks = fetchNewWorkflowItems(workflowName);
                                    //Will the below line be accessed and executed only by one thread??
                                    updateAndClearTasksandWorkflowSet(allTasks);
                                    removeWorkflowNameFromProcessingMap(workflowName);
                                } else {
                                    log.info("RDA cache clean up is already in progress for the workflow " + workflowName);
                                }
                            }
                        }
                    } catch (Exception e) {
                        log.error("Error extracting item of type RDPWorkflowControlMessage from message " + message);
                    }
                }
            }).start();
        }
    } finally {
        lock.unlock();
    }
}

private boolean isWorkflowBeingProcessed(final String workflowName) {
    if (controlMessageStateMap.get(workflowName) == null) {
        synchronized (this) {
            if (controlMessageStateMap.get(workflowName) == null) {
                log.info("Adding an entry in to the processing map for the workflow " + workflowName);
                controlMessageStateMap.put(workflowName, true);
                return false;
            }
        }
    }
    return true;
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            removeWorkflowNameFromProcessingMap(workflowName);
        }
    });
    log.info("Shut Down Hook attached for the thread processing the workflow " + workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message " + message);
    }
    return rdpWorkflowControlMessage;
}

private List<RDPWorkflowTask> fetchNewWorkflowItems(String workflowName) {
    workflowRestServiceClient.initSSL();
    List<RDPWorkflowTask> allTasks = workflowRestServiceClient.fetchWorkflowSpecificTasks(workflowName);
    return allTasks;
}

private void updateAndClearTasksandWorkflowSet(List<RDPWorkflowTask> allTasks) {
    synchronized (this) {
           if (allTasks != null && allTasks.size() > 0) {
                taskEventListener.addRDPWorkflowTasks(allTasks);
                workflowEventListener.updateWorkflowStatus(allTasks);
                taskEventListener.getRecordsForUpdate().clear();
                workflowEventListener.getRecordsForUpdate().clear();
            }
    }

}

private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
    if (workflowName != null
            && (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap.get(workflowName))) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the " + workflowName + " is released from the status map");
    }
}
Community
  • 1
  • 1
Balaji
  • 191
  • 3
  • 14
  • Don't use `synchronized` to order things. The only thing it's good for is mutual exclusion. If you want two (or more) things to happen in a particular order (e.g., _Only after the first thread... then the second thread should start ..._), then those things should both (all) be done in the same thread. Remember: Every thread can see all of memory. If there's work to be done, it should not matter which thread does the work. – Solomon Slow Jan 12 '16 at 15:11
  • @jameslarge i am afraid i don understand your point. I have presented the modified version of above and tell me if that would work – Balaji Jan 12 '16 at 15:14
  • @Baliji James is correct that the simplest way to make things execute in sequence is to do them on a single thread. There may be a reasons this won't work for you such as if you want to have multiple threads working on a bunch of tasks and then do something when they are all done. For that look at CountDownLatch and the related classes in java.util.concurrent. – JimmyJames Jan 12 '16 at 15:20
  • @JimmyJames you mean to say classes like CountDownLatch will suffice my expectations? – Balaji Jan 12 '16 at 15:26
  • I'm not sure I completely understand what you are trying to do. Countdown latch simply allows you have one (ore more) threads wait until a number of tasks are completed. – JimmyJames Jan 12 '16 at 15:27
  • Sorry, I don't know what the whole thing is supposed to do. I'm only saying that, while the `synchronized` keyword will prevent two threads from entering blocks that are synchronized on the same object at the same time, it won't do anything to guarantee the order in which the threads get to run the synchronized blocks. – Solomon Slow Jan 12 '16 at 15:28
  • @jameslarge will the code inside updateAndClearTasksandWorkflowSet() method be accessed by only one thread at a time ?? can that be assured? – Balaji Jan 12 '16 at 15:37
  • @Balaji If the class, that offers updateAndClearTasksandWorkflowSet() is instantiated only once, then yes, otherwise no. Anyway, I think you are reinventing the wheel here (or I didn't get the full picture yet). Maybe you want to take a look at how to use ThreadPools and ExecutorServices. Since you already have an abstract RDPTask, it should be fairly easy to implement. – SME_Dev Jan 12 '16 at 15:53
  • Your `updateAndClearTasksandWorkflowSet()` is an _instance_ method of some class. Two threads can enter the synchronized block at the same time *IFF* each thread has called the method for a different instance of the class. Two threads that call the method for the same instance will be forced to go through the synchronized block one at a time. Whichever one gets there first will go through first, and whichever one gets there second will wait its turn. – Solomon Slow Jan 12 '16 at 15:55

1 Answers1

0

I think you are going about this the wrong way. In your processControlMessage() method, you are locking and releasing things multiple times. You are locking the instance in some cases and in others you are synchronizing on a method. There's no way to know what is happening between those locks and releases.

I believe from what you have submitted, that you are trying to make sure that only one thread is accessing controlMessageStateMap at a time. If that is correct, then why not lock the Map at the beginning of the block and release it when you are done?

new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronize(controlMessageStateMap) {
                    if (message instanceof TextMessage) {
                        if (controlMessageType != null && controlMessageType == RdpWorkflowControlMessageType.REFRESH) {
                            if (!isWorkflowBeingProcessed(workflowName)) {
                        ...
                    } // end of synchronize block
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message " + message);
                }

I don't think this is a great strategy overall. But without understanding the context of what you are trying to do, I can't advise an alternative.

What is the atomicity protecting against? What is the use case you are trying to solve?

Michael
  • 401
  • 5
  • 16
  • i am using a shared map for a different purpose which is answered in the other question i raised (link pasted above).. From the code that i have pasted above , can you confirm whether block inside updateAndClearTasksandWorkflowSet(allTasks) will be access only by one thread at a time ? – Balaji Jan 12 '16 at 16:07
  • Ok let me not confuse with so many terminologies and make it straight. I want to ensure that block inside updateAndClearTasksandWorkflowSet() be accessed by only one thread at a time.Is my approach correct or not ? – Balaji Jan 12 '16 at 16:12
  • Assuming there is only one instance of the class for which the method is defined, synchronizing on (this) will indeed make sure no one else is executing updateAndClearTasksandWorkflowSet() at the same time. – Michael Jan 12 '16 at 16:16
  • Yes mate i have only one instance of the class. – Balaji Jan 13 '16 at 11:29