I have configured JMS topic in JBOSS_EAP_7.0 and write a simple java code to create a message producer. I have the following stateless bean
@Stateless
public class ExchangeSenderFacadeWrapperBean {
private static final OMSLogHandlerI logger = new Log4j2Handler("ClientSenderFacadeBean");
@Resource(lookup = "java:/JmsXA") // inject ConnectionFactory (more)
protected ConnectionFactory factory;
@Resource(lookup = "java:/jms/topic/ORD_CLINT_PUSH")
protected Topic target;
private Connection connection = null;
private Session session = null;
public void sendMessage(String message) {
MessageProducer producer= null;
try {
if(connection==null){ //todo verify
connection = factory.createConnection();
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(target);
producer.setDisableMessageID(true);
TextMessage outmsg = session.createTextMessage(message);
producer.send(outmsg);
logger.info("Message was sent to Topic");
producer.setTimeToLive(900000);//15min //todo
} catch (Exception e) {
logger.error(" Error when sending order to jboss:", e);
throw new OMSCoreRuntimeException(e.getMessage(), e);
} finally {
try {
if (producer != null)
producer.close();
} catch (JMSException e) {
logger.warn("\n jms producer close error:",e);
}
try {
if (session != null)
session.close();
} catch (JMSException e) {
logger.warn("\n jms session close error:",e);
}
}
}
This works fine until i made simple change to move sendMessage(String message)
method to pojo class as follow.
@Stateless(name = "ExchangeSenderFacadeBean")
@Local({ExchangeSenderFacadeLocalI.class})
public class ExchangeSenderFacadeWrapperBean implements ExchangeSenderFacadeLocalI {
@Resource(lookup = "java:/JmsXA") // inject ConnectionFactory (more)
protected ConnectionFactory factory;
@EJB(beanName = "BeanRegistryLoader")
protected BeanRegistryLoader omsRegistryBean;
protected BeanRegistryCore beanRegistryCore;
@Resource(lookup = "java:/jms/queue/ToExchange")
protected Queue target;
private ExchangeSenderFacadeCoreI exchangeSenderFacadeCore;
@Override
public void sendToExchange(ExchangeMessage exchangeMessage) {
exchangeSenderFacadeCore.sendToExchange(exchangeMessage);
}
@PostConstruct
public void init() {
beanRegistryCore = omsRegistryBean.registry();
if (exchangeSenderFacadeCore == null) {
exchangeSenderFacadeCore = ((BeanRegistryCore) omsRegistryBean.registry()).getExchangeSenderFacadeCoreI();
exchangeSenderFacadeCore.setBeanRegistryCore(omsRegistryBean.registry());
exchangeSenderFacadeCore.setFactory(factory);
exchangeSenderFacadeCore.setTargetQueue(target);
}
}
}
ConnectionFactory
and target Queue
variables set inside EJB PostConstruct
method and pojo class looks like follow, which now contains logic to create and publish method to EJB queue
public class ExchangeSenderFacadeCore implements ExchangeSenderFacadeCoreI {
private static final OMSLogHandlerI logger = new Log4j2HndlAdaptor("ExchangeSenderFacadeCore");
private BeanRegistryCore beanRegistryCore;
private ConnectionFactory factory;
private Connection connection = null;
private Session session = null;
private long ttl = 900000;
protected Queue targetQueue;
public ExchangeSenderFacadeCore() {
if (System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL) != null && System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL).length() > 0) {
ttl = Long.parseLong(System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL));
}
logger.info("LN:103", "==JMS Topic TTL:" + ttl);
}
@Override
public void processSendToExchange(ExchangeMessage exchangeMessage) {
sendToExchange(exchangeMessage);
}
public boolean isParallelRunEnabled() {
Object isParallelRun = beanRegistryCore.getCacheAdaptorI().cacheGet(OMSConst.DEFAULT_TENANCY_CODE, OMSConst.APP_PARAM_IS_PARALLEL_RUN, CACHE_NAMES.SYS_PARAMS_CACHE_CORE);
if (isParallelRun != null && String.valueOf(isParallelRun).equals(OMSConst.STRING_1)) {
return true;
}
return false;
}
@Override
public void sendToExchange(ExchangeMessage exchangeMessage) {
MessageProducer producer = null;
try {
if (isParallelRunEnabled()) {
logger.info("LN:66", "== Message send to exchange skipped,due to parallel run enabled");
return;
}
if (connection == null) {
connection = factory.createConnection();
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(targetQueue);
producer.setDisableMessageID(true);
Message message = beanRegistryCore.getJmsExchangeMsgTransformerI().transformToJMSMessage(session, exchangeMessage);
producer.send(message);
producer.setTimeToLive(ttl);//default 15min
logger.elkLog("78", "-1", LogEventsEnum.SENT_TO_EXCHANGE, exchangeMessage.toString());
} catch (Exception e) {
logger.error("LN:80", " Error when sending order to exchange:", e);
throw new OMSCoreRuntimeException(e.getMessage(), e);
} finally {
try {
if (producer != null)
producer.close();
} catch (JMSException e) {
logger.error("LN:87", "JMS producer close error:", e);
}
try {
if (session != null)
session.close();
} catch (JMSException e) {
logger.error("LN:93", "JMS session close error:", e);
}
}
}
@Override
public void processSendToExchangeSync(ExchangeMessage exchangeMessage) {
}
@Override
public BeanRegistryCore getBeanRegistryCore() {
return beanRegistryCore;
}
@Override
public void setBeanRegistryCore(BeanRegistryCore beanRegistryCore) {
this.beanRegistryCore = beanRegistryCore;
}
@Override
public ConnectionFactory getFactory() {
return factory;
}
@Override
public void setFactory(ConnectionFactory factory) {
this.factory = factory;
}
@Override
public Queue getTargetQueue() {
return targetQueue;
}
@Override
public void setTargetQueue(Queue targetQueue) {
this.targetQueue = targetQueue;
}
}
But when i execute moderated code it gives me following error
javax.ejb.EJBTransactionRolledbackException: Producer is closed
Any possible fixes?