4

Problem statement

I have a JMS listener running as a thread listening to a topic. As soon a message comes in, I spawn a new Thread to process the in-bounded message. So for each incoming message I spawn a new Thread.
I have a scenario where duplicate message is also being processed when it is injected immediately in a sequential order. I need to prevent this from being processed. I tried using a ConcurrentHashMap to hold the process times where I add in the entry as soon as Thread is spawn and remove it from the map as soon Thread completes its execution. But it did not help when I tried with the scenario where I passed in same one after the another in concurrent fashion.

General Outline of my issue before you plunge into the actual code base

onMessage(){
    processIncomingMessage(){
        ExecutorService executorService = Executors.newFixedThreadPool(1000);
            //Map is used to make an entry before i spawn a new thread to process incoming message
            //Map contains "Key as the incoming message" and "value as boolean"
            //check map for duplicate check
            //The below check is failing and allowing duplicate messages to be processed in parallel
        if(entryisPresentInMap){ 
                //return doing nothing
        }else{
                //spawn a new thread for each incoming message
                //also ensure a duplicate message being processed when it in process by an active thread
        executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //actuall business logic
                    }finally{
                        //remove entry from the map so after processing is done with the message
                    }

                }
        }
    }

Standalone example to mimic the scenario

public class DuplicateCheck {

private static Map<String,Boolean> duplicateCheckMap =
        new ConcurrentHashMap<String,Boolean>(1000);

private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
    System.out.println("Processed message =" +message);

}

public static void main(String args[]){
    nameArray[0] = "Peter";
    nameArray[1] = "Peter";
    nameArray[2] = "Adam";
    for(int i=0;i<=nameArray.length;i++){
    name=nameArray[i];
    if(duplicateCheckMap.get(name)!=null  && duplicateCheckMap.get(name)){
        System.out.println("Thread detected for processing your name ="+name);
        return;
    }
    addNameIntoMap(name);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                processMessage(name);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                freeNameFromMap(name);
            }
        }
    }).start();
    }
}

private static synchronized void addNameIntoMap(String name) {
    if (name != null) {
        duplicateCheckMap.put(name, true);
        System.out.println("Thread processing the "+name+" is added to the status map");
    }
}

private static synchronized void freeNameFromMap(String name) {
    if (name != null) {
        duplicateCheckMap.remove(name);
        System.out.println("Thread processing the "+name+" is released from the status map");
    }
}

Snippet of the code is below

public void processControlMessage(final Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
    final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
    final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
    if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
        log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
        return;
    }else {
        log.info("doing nothing");
    }
    Semaphore controlMessageLock = new Semaphore(1); 
    try{
    controlMessageLock.acquire();
    synchronized(this){
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock.lock();
                    log.info("Processing Workflow Control Message for the workflow :"+workflowName);
                    if (message instanceof TextMessage) {
                    if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                        clearControlMessageBuffer();
                        enableControlMessageStatus(workflowName);
                        List<String> matchingValues=new ArrayList<String>();
                        matchingValues.add(workflowName);
                        ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
                        ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
                        tasksSetDAO.deleteMatchingRecords(matchingValues);
                        workflowSetDAO.deleteMatchingRecords(matchingValues);
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                    lock.unlock();
                }
            }
        }).start();
    }
    } catch (InterruptedException ie) {
        log.info("Interrupted Exception during control message lock acquisition"+ie);
    }finally{
        controlMessageLock.release();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message "
                + message);
    }
    return rdpWorkflowControlMessage;
}

private void fetchNewWorkflowItems() {
    initSSL();
    List<RDPWorkflowTask> allTasks=initAllTasks();
    taskEventListener.addRDPWorkflowTasks(allTasks);
    workflowEventListener.updateWorkflowStatus(allTasks);
}

private void clearControlMessageBuffer() {
    taskEventListener.getRecordsForUpdate().clear();
    workflowEventListener.getRecordsForUpdate().clear();
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

I have modified my code to incorporate suggestions provided below but still it is not working

public void processControlMessage(final Message message) {
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    try{
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
        if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
            log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
            return;
        }else {
            log.info("doing nothing");
        }
        enableControlMessageStatus(workflowName);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //actual code
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                }
            }
        });
    } finally {
        executorService.shutdown();
        lock.unlock();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}
Balaji
  • 191
  • 3
  • 14
  • do you want that only one Thread executes at a time.. – awsome Dec 18 '15 at 13:07
  • No two of those threads will ever run concurrently as far as I can see. Throw that code into the garbage bin and do it again from scratch. Think about what you want it to do before you start hitting a key. – Fildor Dec 18 '15 at 13:36
  • @awsome I need multiple threads running processing the incoming messages parallel. When duplicate messages are coming say for instance "Test" comes twice one after the another then I should block the duplicate message from being processed again – Balaji Dec 18 '15 at 15:36
  • @Fildor I am able to spawn multiple threads for the incoming messages. This is the expectation from the team as well. When I have 3 msgs - "Test A","Test B","Test C" they should be processed in a parallel way. Issue I have here is when I have 2 same messages coming at the same time I have 2 threads spawned one of which is doing the duplicate routine. I need to prevent. Please suggest better ways of doing this – Balaji Dec 18 '15 at 15:47
  • Hi I just saw, that I was mistaken. The synchronization that is going on here is really complicating things. Here is what I would do: Listener checks if Key exists in Queue (actually a map). If does, dismiss message. If not add the message with its key to the map and submit a Task to an Executor. That task will be given the key, retrieve the message and do your magic on it. On completion, it will remove the key from the map and voila: the listener is free again to add a message for that key. Nearly no explicit syncing necessary if you use a Thread-safe map implementation. Advantage of ... – Fildor Dec 18 '15 at 16:17
  • 1
    Advantage of Executor is that you can pretty well scale the number of Threads that work on the task queue. There are many standard implementations ready to go. And you will safe a lot of overhead reusing Threads. – Fildor Dec 18 '15 at 16:21
  • And I just got aware why all your synchronization is futile: You add the workflowname to that map **inside** the thread ... that's too late. If you want to make a simple experiment, then 1. remove all synchronization code from your example. Move adding the workflowname to the map right up before where you at the moment define the semaphore. If processControlMessage is called on more than one thread then add the synchronized modifier to its signature and that's it. – Fildor Dec 18 '15 at 16:38
  • How are `message`, `workflowname` and `controlMessageEvent` related by the way? You refer to the latter two, but we don't see in the snippet where they come from. – Fildor Dec 18 '15 at 16:45
  • @Fildor I have updated the code to make more sense here. Apologies if you had confusions earlier. From the incoming messagae which is an XML i unmarshall it to a specific type and then retrieve workflow name and controlmessage event from the unmarshalled object. The point what you said is correct , by the time I insert the workflow name in the thread it is way too late which is why I am unable to handle the check – Balaji Dec 18 '15 at 17:47
  • @Fildor I did the modifications and it did not help me as well :( Still when I passed duplicated messages at the same time the problem is of the timing of the entry in the map – Balaji Dec 22 '15 at 14:49
  • Your requirements are not properly defined. You say that if a duplicate message is enqueued "immediately" after the previous one, then you don't want to process it, but you haven't defined "immediately". In particular, if the thread that processes the first message completely finishes before the next message is picked up, you haven't specified when you want to process the 2nd one and when you don't. – Matt Timmermans Dec 22 '15 at 15:45
  • @MattTimmermans I have updated my question with standalone program now .Please let me know if it will suffice you to answer – Balaji Dec 22 '15 at 15:47
  • No. I mean that what you say you want is not really what you want. I can tell, because it makes no sense to want that. I can tell you how to do what you say you want, but I won't because it's not what you want. Look at the loop in your standalone program -- after you say .execute(), the new thread might run, process the message, remove the map entry and completely finish, ALL BEFORE THE NEXT ITERATION OF THE LOOP. Then the loop will then add a duplicate message. Do you want it processed or not? – Matt Timmermans Dec 22 '15 at 15:57
  • @MattTimmermans thanks for the brilliant reply. In the standalone program I am feeding messages in loop one after the another and I don want the duplicate message to be processed. In real time I have a JMS listener listening against a queue for incoming message and I want to prevent duplicate incomings from processed. Now can you give me what I want please ? – Balaji Dec 22 '15 at 16:08
  • Almost. First tell me how you expect to know that you don't want to process the 2nd message. I'm guessing that if it came in an hour later from a completely different thread, then you _would_ want to process it... So what's the difference between the ones you want to process and the ones you don't want to process? – Matt Timmermans Dec 22 '15 at 16:17
  • @MattTimmermans if it came an hour later then ideally it should be processed. When I say I don want to process duplicated I mean when they are injected parallel. Say for instance I have button in UI which will post something to JMS from the form textbox. When I click the button twice in this case it same message will be posted twice back to back. Are you with me now ? – Balaji Dec 22 '15 at 16:30
  • Nope. "injected in parallel" is not well defined. I'm afraid I'm out of patience. I'm gonna have to leave this for the next guy... – Matt Timmermans Dec 22 '15 at 16:32

2 Answers2

1

This is how you should add a value to a map. This double checking makes sure that only one thread adds a value to a map at any particular moment of time and you can control the access afterwards. Remove all the locking logic afterwards. It is as simple as that

public void processControlMessage(final  String workflowName) {
    if(!tryAddingMessageInProcessingMap(workflowName)){
           Thread.sleep(1000); // sleep 1 sec and try again
            processControlMessage(workflowName);
        return ;
    }
     System.out.println(workflowName);
     try{
         // your code goes here
     } finally{
         controlMessageStateMap.remove(workflowName);
     }
}

private boolean tryAddingMessageInProcessingMap(final String workflowName) {
    if(controlMessageStateMap .get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap .get(workflowName)==null){
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

Read here more for https://en.wikipedia.org/wiki/Double-checked_locking

awsome
  • 2,143
  • 2
  • 23
  • 41
  • your suggestion asks me to add an entry into the map using the parent thread ( here it will be my listener thread) . Inside the processControlMessage routine I spwan a new thread for every incoming message. Consider a scenario where I add an entry into the map for the incoming message and then if there is any issue in starting the thread then will it be a fail proof scenario? . For example I have message "Test A" and gets added into the map. If something goes wrong in starting of the thread to process this message , then the duplicate incoming will be mistakenly bypassed – Balaji Dec 19 '15 at 13:18
  • I have edited my answer.. More important thing in my answer is double checked locking.. You can make the duplicate thread wait and try again after 1 sec or the time you expect the processing to take and then the duplicate thread also goes through but not in parallel.. Also add a break condition to avoid waiting for ever – awsome Dec 19 '15 at 22:03
  • I would like to additionally clarify here that there will only be one parent thread running( the listener thread in my case which will execute processControlMessage() routine). This inturn will spawn multiple child threads for each incoming message. Probably I am still able to interpret what you are trying to convey. You have asked me to check the double locking mechanism outside the point where only one thread will be running. – Balaji Dec 22 '15 at 11:11
  • I need a solution to fix this problem which is critical in my case – Balaji Dec 22 '15 at 11:12
  • make a very simple usecase like my answer which just shows what you want to do rather than seeing your whole project code.. that is usually easy to understand for others also. You will get more answers from the community.. Someone should be able to take your code and execute that code without having to worry some libraries that you are using.. – awsome Dec 22 '15 at 13:25
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/98714/discussion-between-awsome-and-balaji). – awsome Dec 22 '15 at 14:00
  • issue is resolved now I have included the double checking logic and modified the code below which is working fine – Balaji Dec 24 '15 at 09:27
0

The issue is fixed now. Many thanks to @awsome for the approach. It is avoiding the duplicates when a thread is already processing the incoming duplicate message. If no thread is processing then it gets picked up

public void processControlMessage(final Message message) {
    try {
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (message instanceof TextMessage) {
                        if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                            if (tryAddingWorkflowNameInStatusMap(workflowName)) {
                                log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
                                addShutdownHook(workflowName);
                                clearControlMessageBuffer();
                                List<String> matchingValues = new ArrayList<String>();
                                matchingValues.add(workflowName);
                                ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
                                ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
                                tasksSetDAO.deleteMatchingRecords(matchingValues);
                                workflowSetDAO.deleteMatchingRecords(matchingValues);
                                List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
                                updateTasksAndWorkflowSet(allTasks);
                                removeWorkflowNameFromProcessingMap(workflowName);

                            } else {
                                log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                }
            }
        }).start();
    } finally {
        lock.unlock();
    }
}

private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
    if(controlMessageStateMap.get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap.get(workflowName)==null){
                 log.info("Adding an entry in to the map for the workflow ="+workflowName);
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
    if (workflowName != null
            && (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
                    .get(workflowName))) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the " + workflowName+ " is released from the status map");
    }
}
Balaji
  • 191
  • 3
  • 14