I am experimenting with the Spring Webflux and Spring Integration to create a reactive stream (Flux) from a JMS queue.
I am attempting to create a reactive stream (Spring Webflux) from a JMS queue (IBM MQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly as the messages are getting consumed by the reactive listener. However my reactive flux stream is not able to dsiplay those messages. Any help would be appreciated.
Here is the code which I am using to make my JMS listener reactive :
UM Gateway
@Named
@Slf4j
public class UmGateway {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private JmsTemplate jmsTemplate;
@Value("${um.mq.queueName}")
private String queueName;
@Bean
public Publisher<Message<MilestoneNotification>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination(queueName))
.channel(MessageChannels.queue())
.log(Level.DEBUG)
.log()
.toReactivePublisher();
}
public Flux<MilestoneNotification> findAll() {
return Flux.from(jmsReactiveSource())
.map(Message::getPayload);
}
/**
* Method which sends Milestone Notifications to the UM Queue.
*/
public void send(final MilestoneNotification message) {
jmsTemplate.convertAndSend(queueName, message);
}
}
Controller
@RestController
@Slf4j
@RequiredArgsConstructor
@RequestMapping(ApiConstants.MILESTONE_UM)
public class MilestoneUmController {
@Autowired
private UmGateway umGateway;
@RequestMapping(value = "/message", method = RequestMethod.POST)
public ResponseEntity<Boolean> sendMessage(
final @RequestBody MilestoneNotification notification) {
umGateway.send(notification);
return new ResponseEntity<>(HttpStatus.OK);
}
@GetMapping(path = "/milestone-notification/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MilestoneNotification> feed() {
return this.umGateway.findAll();
}
}
Here are the logs :
- 2020.02.06 13:53:04.900 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","NotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997184878, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13530511, id=5d277be2-49f5-3e5d-8916-5793db3b76e7, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e03738521, timestamp=1580997184900}]
- 2020.02.06 13:53:04.968 [qtp2132762784-23] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:53:53.521 [qtp2132762784-18] INFO c.j.w.p.u.RequestLoggingInterceptor - Received GET request to /common/dataservice/di/milestone/um/milestone-notification/stream with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.070 [qtp2132762784-16] INFO c.j.w.p.u.RequestLoggingInterceptor - Received POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.541 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","diNotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997249519, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13540975, id=5421898e-5ef6-1f9b-aaa6-81ebc7668f50, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e08738521, timestamp=1580997249541}]
- 2020.02.06 13:54:09.593 [qtp2132762784-16] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]