1

Hi developers,

I want to write 2 .java files using JMS library names are MessageProducer and MessageConsumer.

I added activemq-all-5.8.0.jar and commons-io-2.4.jar files in my lib folder.and I changed port number of Activemq from 61616 to 61617.

using MessageProducer.java file,I will send messages to Activemq.For this I wrote code it's working fine.If you want to see click on this Link.

enter image description here

I want to send messages from Activemq to MessageConsumer.java.This is Application is in Apache Tomcat(http://localhost:8080/ExecutableFileProcess/MessageConsumer)

Once MessageConsumer receives the message, it separates the message-body from message and it just print on console(just for my testing).For this I wrote the following 2 java files.But it's not working.

MessageConsumer.java :

 package PackageName;
 import java.io.IOException;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
        throws ServletException, IOException {
try {
    //creating connectionfactory object for way
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
    //establishing the connection b/w this Application and Activemq
    Connection connection=connectionFactory.createConnection();
    Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue=session.createQueue("MessageTesing");
    javax.jms.MessageConsumer consumer=session.createConsumer(queue);
    //fetching queues from Activemq
    MessageListener listener = new MyListener();
    consumer.setMessageListener(listener);
    connection.start();
} catch (Exception e) {
    // TODO: handle exception
}

}
}

MyListener.java :

package PackageName;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    System.out.println(msg);
}
};

I didn't configure destination for Queue in Activemq console and also I didn't mention "destination" while sending message from MessageProducer.java.

I am using Eclipse.How can I print messagebody in console,Actually based on messagebody I will do some operations in my MessageConsumer.java.but for my testing I need to see messagebody.

I hope,you understand what I am trying.

I am new to JMSand Java,so can you explain clearly.So far I wrote the code using Google search.But I didn't find anywhere this issue.

can anyone suggest me.

Thanks.

Community
  • 1
  • 1
Hanumath
  • 1,117
  • 9
  • 23
  • 41
  • Hi Dear, Please look on this question. I am in great need for this. http://stackoverflow.com/questions/19706788/jersey-rest-web-service-with-activemq-middleware-integration – Kumar Nov 04 '13 at 11:05

2 Answers2

1

Possibly your program just starts and terminates, because you have written the client to receive messages asynchronously (by using a MessageListener, in a different thread), and your main thread just terminates.

Try something like this:

                connection.start();

                System.out.println("Press a key to terminate");
                try {
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                System.out.println("End");

this should keep your program running, until you hit a key.


To read the body of a TextMessage, use something like this (this is quoted from ActiveMQ hello world examples:

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
            } else {
                System.out.println("Received: " + message);
            }
Beryllium
  • 12,808
  • 10
  • 56
  • 86
  • If I run any program,I am getting 3 warning messages.that everything was exaplained in this link `http://stackoverflow.com/questions/18129668/how-can-i-remove-warning-messages-in-activemq`.I appended your code in my program,first time it's working fine.I tried second time from here it's not working.I think because of that warnings. How can I fix this. – Hanumath Aug 08 '13 at 15:51
  • When the program has received the message during the 1st run, these messages were consumed (the queue is empty after that). You have to send some new messages. The warning messages are related to log4j, this should not be a problem. – Beryllium Aug 08 '13 at 16:00
  • Just now I understand,but how can I fix warning messages in `log4j` file – Hanumath Aug 08 '13 at 16:02
  • @user2642355 I'll have a look at the other question. But do you need any more help with this question? – Beryllium Aug 08 '13 at 16:24
  • yes I want to get message body from message. I didn't find any method for this.I tried with `System.out.println(msg.getJMSMessageID());`.But I am getting messageId. If you know is there any function for getting messagebody from message. – Hanumath Aug 08 '13 at 16:32
  • I run the `System.out.println(msg.getClass());` .I am getting `org.apache.activemq.command.ActivemqTextMessage`.I open that Api documentation But I didn't find any function for getting message body. – Hanumath Aug 08 '13 at 16:35
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/35090/discussion-between-user2642355-and-beryllium) – Hanumath Aug 08 '13 at 16:52
1
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class ActiveMQConnection {

    private static final long timeout = 3000L;

    private Logger logger = Logger.getLogger(ActiveMQConnection.class);

    private String activeMQUser; 
    private String activeMQPassword;
    private String activeMQURI;
    private String activeMQQueueName;

    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private boolean isConnected;

    private boolean transacted = false;
    private Destination destinationQueue;


    private boolean isQueueAvailable;
    private boolean isProducerAvailable;
    private boolean isConsumerAvailable;

    private MessageProducer producerForQueue;
    private MessageConsumer consumerForQueue;

    /**
     * Create Default service
     * @throws Exception 
     */
    public ActiveMQConnection() throws Exception
    {
        try {
                throw new JMSException("No Parameters defined for creating connection. Try constructor with parameters.");
        } catch (JMSException e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        } catch (Exception e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        }
    }


    /**
     * Create a service with desired parameters.
     * @param activeMQUser
     * @param activeMQPassword
     * @param activeMQURI
     * @param activeMQQueueName
     * @throws Exception 
     */
    public ActiveMQConnection(String activeMQUser, String activeMQPassword, String activeMQURI) throws Exception
    {
        try {
            this.activeMQUser = activeMQUser;
            this.activeMQPassword = activeMQPassword;
            this.activeMQURI =  activeMQURI;
            setUpActiveMQConnection();

        } catch (JMSException e) {
            logger.error("JMS Exception in Active MQ Connection",e);
            throw e;
        } catch (Exception e) {
            logger.error("Exception in Active MQ Connection",e);
            throw e;
        }
    }

    /**
     * @throws JMSException, Exception 
     */
    private void setUpActiveMQConnection() throws JMSException, Exception
    {
        connectionFactory = new ActiveMQConnectionFactory(
                this.activeMQUser,
                this.activeMQPassword,
                this.activeMQURI );
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
            isConnected = true;
        }catch (JMSException e) {
            isConnected = false;
            throw e;
        }catch(Exception e){
            isConnected = false;
            throw e;
        }
    }

    /**
     * @throws Exception
     */
    public void setUpQueue(String queueName ) throws Exception
    {
        this.activeMQQueueName = queueName;
        createQueue();
        createProducer();
        createConsumer();
    }

    /**
     * @throws Exception
     */
    private void createQueue() throws Exception 
    {
        try {
            if(destinationQueue == null)
                {   
                destinationQueue = session.createQueue(this.activeMQQueueName);
                isQueueAvailable = true;
                }
        } catch (JMSException e) {
            isQueueAvailable = false;
            throw e;
        }catch(Exception e){
            isQueueAvailable = false;
            throw e;
        }
    }

    /**
     * @throws JMSException 
     * 
     */
    private void createProducer() throws JMSException
    {   
        if(producerForQueue == null)
        {   
            try {
                producerForQueue = session.createProducer(destinationQueue);
                producerForQueue.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                isProducerAvailable = true;
            } catch (JMSException e) {
                isProducerAvailable = false;
                throw e;
            }
        }
    }

    /**
     * @throws JMSException
     */
    private void createConsumer() throws JMSException
    {   
        if(consumerForQueue == null)
        {   
            try {
                consumerForQueue = session.createConsumer(destinationQueue);
                isConsumerAvailable = true;
            } catch (JMSException e) {
                isConsumerAvailable = false;
                throw e;
            }
        }
    }

    /**
     * @param objectToQueue
     * @throws JMSException
     */
    public void sendMessage(Serializable objectToQueue) throws JMSException 
    {
        ObjectMessage message = session.createObjectMessage();
        message.setObject(objectToQueue);
        producerForQueue.send(message);
    }

    /**
     * @param objectToQueue
     * @throws JMSException
     */
    public Serializable receiveMessage() throws JMSException 
    {
        Message message = consumerForQueue.receive(timeout);
        if (message instanceof ObjectMessage) 
              { 
                  ObjectMessage objMsg = (ObjectMessage) message;
                  Serializable sobject = objMsg.getObject();
                  return sobject;
              }
        return null;
    }

    /**
     * close-MQ-Connection
     */
    public void closeMQConnection()
    {
        try 
        {
            if(consumerForQueue != null)
            {
            consumerForQueue.close();
            }
            if(producerForQueue != null)
            {
            producerForQueue.close();
            }
            if(session != null)
            {
            session.close();
            }
            if(connection != null )
            {
            connection.close();
            }   
        } 
        catch (JMSException e) 
            {
            logger.info("Error while closing connection.",e);
            }
        finally
            {
            consumerForQueue = null;
            producerForQueue = null;
            destinationQueue = null;
            session = null;
            connection = null;
            activeMQUser = null;
            activeMQPassword = null;
            activeMQQueueName = null;
            activeMQURI = null;
            }
    }

    public boolean isConnected() {
        return isConnected;
    }

    public void setConnected(boolean isConnected) {
        this.isConnected = isConnected;
    }

    public boolean isQueueAvailable() {
        return isQueueAvailable;
    }

    public void setQueueAvailable(boolean isQueueAvailable) {
        this.isQueueAvailable = isQueueAvailable;
    }

    public boolean isProducerAvailable() {
        return isProducerAvailable;
    }

    public void setProducerAvailable(boolean isProducerAvailable) {
        this.isProducerAvailable = isProducerAvailable;
    }

    public MessageConsumer getConsumerForQueue() {
        return consumerForQueue;
    }

    public void setConsumerForQueue(MessageConsumer consumerForQueue) {
        this.consumerForQueue = consumerForQueue;
    }

    public boolean isConsumerAvailable() {
        return isConsumerAvailable;
    }

    public void setConsumerAvailable(boolean isConsumerAvailable) {
        this.isConsumerAvailable = isConsumerAvailable;
    }
}

Above is a Utility class for

  • creating connection creating/connecting to queue sending message receiving message
  • You can use methods in it to send or receive any Serializable POJO.
Learn More
  • 1,535
  • 4
  • 29
  • 51