-1

I'm currently trying to send a bunch of simple messages to a queue with plain java.

public AtomicReference<Message> doSend(String message, String queue){
    try (JMSContext context = connectionFactory.createContext()) {
        TextMessage textMessage = context.createTextMessage(message);            
        final AtomicReference<Message> msg = new AtomicReference<>();
        msg.set(textMessage);
        log.info("Sending message to queue {}", queue);
        context.createProducer().send(createDestination(context, queue), textMessage);
        log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());
        return msg;
    }
    catch (Exception e) {
        log.error("Failed to send message to queue",e);
        throw new SipJmsException("Failed to send message to queue", e);
    }
}

private Destination createDestination(JMSContext context, String queue){
    log.debug("Creating destination queue {} connection",queue);
    return context.createQueue(queue);
}

I send N messages in a row and logs show that JMSMessageId is always generated the same.

[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824

etc.

As far as I know JMSMessageId is supposed to be unique and it's collision causes problems.

O'Reily book states:

The JMSMessageID is a String value that uniquely identifies a message. How unique the identifier is depends on the vendor. The JMSMessageID can be useful for historical repositories in JMS consumer applications where messages need to be uniquely indexed. Used in conjunction with the JMSCorrelationID, the JMSMessageID is also useful for correlating messages: String messageid = message.getJMSMessageID();

So, why is MessageId not unique? (it's even the same between application runs).

Ermintar
  • 1,322
  • 3
  • 22
  • 39

3 Answers3

4

The message ids are unique, I marked the differing number with a *:

414d5120444556494d53514d20202020551c3f5d81619824
                                         *
414d5120444556494d53514d20202020551c3f5d83619824
Daniel Steinmann
  • 2,119
  • 2
  • 15
  • 25
1

I have created a simple JMS program that puts 5 messages to a queue and after each put, it outputs the JMSMessageId.

Sample output:

2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.
2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: 
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201102
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201103
2019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201104
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201105
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201106
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection
2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection

Notice that each message id is unique.

Here's the JMS program that generated the output:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import javax.jms.*;

import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;

/**
 * Program Name
 *  MQTestJMS11x5
 *
 * Description
 *  This java JMS class will connect to a remote queue manager and put 5 messages to a queue.
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTestJMS11x5
{
   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String,String> params;
   private MQQueueConnectionFactory mqQCF = null;


   /**
    * The constructor
    */
   public MQTestJMS11x5()
   {
      super();
      params = new Hashtable<String,String>();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ variables.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         try
         {
            mqQCF = new MQQueueConnectionFactory();
            mqQCF.setQueueManager((String) params.get("-m"));
            mqQCF.setHostName((String) params.get("-h"));
            mqQCF.setChannel((String) params.get("-c"));
            mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            try
            {
               mqQCF.setPort(Integer.parseInt((String) params.get("-p")));
            }
            catch (NumberFormatException e)
            {
               mqQCF.setPort(1414);
            }
         }
         catch (JMSException e)
         {
            MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
            MQTestJMS11x5.logger(e.getLocalizedMessage());
            e.printStackTrace();
            throw new IllegalArgumentException();
         }
         catch (Exception e)
         {
            MQTestJMS11x5.logger(e.getLocalizedMessage());
            e.printStackTrace();
            throw new IllegalArgumentException();
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Test the connection to the queue manager.
    * @throws MQException
    */
   private void testConn()
   {
      QueueConnection conn = null;
      QueueSession session = null;
      Queue myQ = null;

      try
      {
         conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));
         conn.start();

         session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
         MQTestJMS11x5.logger("successfully connected.");

         myQ = session.createQueue((String) params.get("-q"));
         MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));

         MQDestination mqd = (MQDestination) myQ;
         mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);

         sendMsg( session, myQ);
      }
      catch (JMSException e)
      {
         MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         e.printStackTrace();
      }
      catch (Exception e)
      {
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         e.printStackTrace();
      }
      finally
      {
         try
         {
            if (session != null)
            {
               session.close();
               MQTestJMS11x5.logger("Closed session");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());
         }

         try
         {
            if (conn != null)
            {
               conn.stop();
               MQTestJMS11x5.logger("Stopped connection");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());
         }

         try
         {
            if (conn != null)
            {
               conn.close();
               MQTestJMS11x5.logger("Closed connection");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());
         }
      }
   }

   /**
    * Send a message to a queue.
    * @throws MQException
    */
   private void sendMsg(QueueSession session, Queue myQ) throws JMSException
   {
      QueueSender sender = null;
      TextMessage msg = null;

      try
      {
         MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());
         MQTestJMS11x5.logger("");

         sender = session.createSender(myQ);

         for (int i=0; i < 5; i++)
         {
            msg = session.createTextMessage();
            msg.setText("This is test message # " + (i+1));

            sender.send(msg);

            MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());
         }
      }
      finally
      {
         try
         {
            if (sender != null)
               sender.close();
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());
         }
      }
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * mainline
    * @param args
    */
   public static void main(String[] args)
   {
      MQTestJMS11x5 write = new MQTestJMS11x5();

      try
      {
         write.init(args);
         write.testConn();
      }
      catch (IllegalArgumentException e)
      {
         MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");
         System.exit(1);
      }
      catch (Exception e)
      {
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         System.exit(1);
      }

      System.exit(0);
   }
}
Roger
  • 7,062
  • 13
  • 20
0
final AtomicReference<Message> msg = new AtomicReference<>();

Why are you using "final". Remove it and try again.

Roger
  • 7,062
  • 13
  • 20
  • final markes for compiler that the object link can't be changed, not it's data. So, final is ok. I've also logged MessageId between app runs in consumer - it's not uniqie. – Ermintar Aug 13 '19 at 20:59
  • Well, I'm not a fan of your use of final or AtomicReference or msg.get().getJMSMessageID(). See my other answer. – Roger Aug 13 '19 at 23:25
  • 2
    @Ermintar - carefully compare the two message ids printed by your application. The difference in ID is marked in Daniels answer. – Shashi Aug 14 '19 at 04:22