2

I'm developing a messaging system with using ActiveMQ and Play Framework v2.4.2 (Java version) to send emails to end users. I'm newbie at JMS/ActiveMQ techs. I just used this Hello World example at ActiveMQ site as an start point.

I created a test class as below to test running ActiveMQ with Play Framework and everything was ok:

public class ActiveMQMailApp {

    public static void main(String[] args) throws Exception {
        setup();
        MailConsumer.initService();
        for (int i =0;i<11;i++) MailProducer.sendMail(fakeMail());
    }
    public static void setup(){
        FakeApplication fakeApplication = Helpers.fakeApplication();
        Helpers.start(fakeApplication);
    }

    private static Mail fakeMail() throws InterruptedException {
        Thread.sleep(1000);
        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd  hh:mm:ss");
        return new Mail( "noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> "+sdf.format(new Date())+" </b></p></body></html>");
    }

}

But when I use this exact code in main app, this exception thrown:

javax.jms.JMSException: Could not create Transport. Reason: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
        at org.apache.activemq.ActiveMQConnectionFactory.createTransport(ActiveMQConnectionFactory.java:332)
        at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:345)
        at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:303)
        at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:243)
        at ir.iais.salary.services.MailProducer.run(MailProducer.java:35)
Caused by: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1159)
        ... 5 more
Caused by: java.io.IOException: org/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)
        ... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/activemq/protobuf/BufferInputStream
        at org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter.<init>(KahaDBPersistenceAdapter.java:65)
        ... 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.activemq.protobuf.BufferInputStream
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

My MailProducer and MailConsumer classes are like these:

public class MailProducer implements Runnable{
    public static final String AMQ_MAIL_QUEUE = "MAIL";
    public static final String BROKER_URL = "vm://localhost?broker.useJmx=false&persistent=false";
    private Mail mail;

    public MailProducer(Mail mail) {
        this.mail = mail;
    }

    public static void sendMail(Mail mail){
        Thread brokerThread = new Thread(new MailProducer(mail));
        brokerThread.setDaemon(false);
        brokerThread.start();
    }

    @Override
    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue(AMQ_MAIL_QUEUE);

            // Create a MessageProducer from the Session to the Topic or Queue
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Create a messages
            TextMessage textMessage = session.createTextMessage(new Gson().toJson(mail));
            // Tell the producer to send the message
            System.out.println("Sent message: "+ new Gson().toJson(mail) + " : " + Thread.currentThread().getName());
            producer.send(textMessage);

            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}




public class MailConsumer implements Runnable, ExceptionListener {
    private static final Logger logger = getLogger(MailConsumer.class);
    private static Thread mailConsumerService;

    public static synchronized void initService() {
        MailConsumer mailConsumer = Play.application().injector().instanceOf(MailConsumer.class);
        if (mailConsumerService != null) {
            logger.info("STOPPING MailConsumer thread.");
            mailConsumerService.interrupt();
        }
        logger.info("Starting MailConsumer thread.");
        mailConsumerService = new Thread(mailConsumer);
        mailConsumerService.setDaemon(true);
        mailConsumerService.setName("MailConsumer Service");
        mailConsumerService.start();
        logger.info("MailConsumer thread started.");
    }

    @Inject
    private MailerClient mailerClient;

    @Override
    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(MailProducer.BROKER_URL);

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            connection.setExceptionListener(this);

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue(MailProducer.AMQ_MAIL_QUEUE);

            // Create a MessageConsumer from the Session to the Topic or Queue
            MessageConsumer consumer = session.createConsumer(destination);

            while (!Thread.currentThread().isInterrupted()) {
                // Wait for a message
                Message message = consumer.receive();

                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                    Mail mail = new Gson().fromJson(text, Mail.class);
                    Email email = new Email();
                    email.setFrom(mail.getFrom());
                    email.setTo(mail.getTo());
                    email.setSubject(mail.getSubject());
                    email.setBodyHtml(mail.getBodyHtml());
                    System.out.println("sending email...");
                    mailerClient.send(email);
                    System.out.println("email sent!");
                } else {
                    System.out.println("Received: " + message);
                    logger.info("message type: "+message.getClass().getSimpleName());
                }

            }
            logger.info("MailConsumer interrupted.");
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                logger.info("MailConsumer thread interrupted.");
            } else {
                logger.error(e.getLocalizedMessage(), e);
            }
        }
    }

    public synchronized void onException(JMSException ex) {
        System.out.println("JMS Exception occured.  Shutting down client.");
        logger.error("ErrorCode=" + ex.getErrorCode() + " , " + ex.getMessage(), ex);
    }
}

I call MailProducer in main app like this:

public Result sendTestMail(){
    if(!DevStatus.gI().isInDebugMode()) return badRequest("You'r not in Development Env.");
    SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd  hh:mm:ss");
    Mail mail = new Mail("noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> " + sdf.format(new Date()) + " </b></p></body></html>");
    MailProducer.sendMail(mail);
    return ok("email sent! "+ sdf.format(new Date()));

It seems the problem is org.apache.activemq.protobuf.BufferInputStream is not in classpath. I added "org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1" to build.sbt but nothing changed. I also disabled ActiveMQ persistence by adding persistent=false to broker URI but it didn't work.

What can I do? Does it even make sense to use ActiveMQ with Play Framework as a JMS? Or some better JMS is available out there to work with Play Framework? What about Akka?!!

Edit: My ActiveMQ related deps are:

  "org.apache.activemq" % "activemq-broker" % "5.13.4",
  "org.apache.activemq" % "activemq-client" % "5.13.4",
  "org.apache.activemq" % "activemq-kahadb-store" % "5.13.4",
  "org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1",

Edit 2: I replaced above dependencies with "org.apache.activemq" % "activemq-all" % "5.14.0" and the main app started to work! I first thought the problem was resolved and is related to the ActiveMQ packages, but I realized the ActiveMQMailApp test class is now throwing same exception as above! I ran this test class in a new simple maven project (not play framework) and everything was ok! I'm afraid this error come back later. What is actually happening?!

youhans
  • 6,101
  • 4
  • 27
  • 39
  • Please provide dependencies list – arseniyandru Aug 04 '16 at 17:25
  • i know it does not answer your question. What's the rationale for not using akka or just a future and using that to send the email out? also, you seem to be creating the activemq connection factory every time your thread runs - better if you get a instance of that and reuse that whenever needed. Also, what happens when you use the activemq-all dependency - the exception you have shown points to missing protobuf and kahadb dependency (getPersistenceAdapter() from SystemUsage). Turn off persistence for now till you get the app working and then resort to persistence. – ali haider Aug 04 '16 at 17:50
  • 1
    @youhans Just to make sure: have you run `sbt update` or `activator update`? Regarding the Akka/ActiveMQ question, see if [this](http://stackoverflow.com/questions/5693346/when-to-use-actors-instead-of-messaging-solutions-such-as-websphere-mq-or-tibco) helps. Unless there is a requirement to use ActiveMQ you should give Akka a try – Salem Aug 04 '16 at 19:13
  • @alihaider, I need queue persistence for this use case so I need some persistable queue like a JMS broker. For now creating activemq connection factory in every consume request isn't a issue because the number of messages are few. At last I changed the dependencies to activemq-all and some strange things happened. I will explain them in revised question. – youhans Aug 06 '16 at 11:49

3 Answers3

5

Add this dependency to your pom.

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-kahadb-store</artifactId>
        <scope>runtime</scope>
    </dependency>

See this issue report.

jumpnett
  • 6,967
  • 1
  • 19
  • 24
  • I changed `activemq-kahadb-store` dependency to `"org.apache.activemq" % "activemq-kahadb-store" % "5.13.4" % "runtime"`, but it didn't work. – youhans Aug 06 '16 at 11:27
  • 1
    I also needed to specify version in above dependency ${activemq.version} – Kaushik Lele Jun 04 '17 at 05:31
3

In case you have the following error:

java.io.IOException: org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
    at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40) ~[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.createPersistenceAdapter(BrokerService.java:2507) [activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.getPersistenceAdapter(BrokerService.java:1267) [activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1179) [activemq-broker-5.15.10.jar:5.15.10]

.... 

java.lang.RuntimeException: Fatally failed to create SystemUsageorg.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
    at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1190) ~[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.checkMemorySystemUsageLimits(BrokerService.java:2178) ~[activemq-broker-5.15.10.jar:5.15.10]

You should add the dependency posted by @jumpnett to your pom

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-kahadb-store</artifactId>
   <scope>runtime</scope>
</dependency>
2

This problem is introduced due to sbt using ivy as resolution system and for some reason activemq-protobuf decided use packaging type maven-plugins.

While maven resolve such artifact as a jar for compilation, ivy (or it's because sbt I am not really sure) will resolve this as type maven-plugins instead of type jar, hence sbt will ignore this dependency as there is no jar in this dependency.

The way to fix it is to explicitly introduce an jar type artifact:

libraryDependencies += "org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1" jar()
Chikei
  • 2,104
  • 1
  • 17
  • 21