1

I am experimenting with the Spring Webflux and Spring Integration to create a reactive stream (Flux) from a JMS queue.

I am attempting to create a reactive stream (Spring Webflux) from a JMS queue (IBM MQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly as the messages are getting consumed by the reactive listener. However my reactive flux stream is not able to dsiplay those messages. Any help would be appreciated.

Here is the code which I am using to make my JMS listener reactive :

UM Gateway

@Named
@Slf4j
public class UmGateway {

  @Autowired
  private ConnectionFactory connectionFactory;


  @Autowired
  private JmsTemplate jmsTemplate;

  @Value("${um.mq.queueName}")
  private String queueName;

  @Bean
  public Publisher<Message<MilestoneNotification>> jmsReactiveSource() {
    return IntegrationFlows
        .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
            .destination(queueName))
        .channel(MessageChannels.queue())
        .log(Level.DEBUG)
        .log()
        .toReactivePublisher();
  }
  
  public Flux<MilestoneNotification> findAll() {
      return Flux.from(jmsReactiveSource())
          .map(Message::getPayload);
  }


  /**
   * Method which sends Milestone Notifications to the UM Queue.
   */
  public void send(final MilestoneNotification message) {
      jmsTemplate.convertAndSend(queueName, message);
  }
}

Controller

@RestController
@Slf4j
@RequiredArgsConstructor
@RequestMapping(ApiConstants.MILESTONE_UM)
public class MilestoneUmController {

  @Autowired
  private UmGateway umGateway;

  @RequestMapping(value = "/message", method = RequestMethod.POST)
  public ResponseEntity<Boolean> sendMessage(
      final @RequestBody MilestoneNotification notification) {
    umGateway.send(notification);
    return new ResponseEntity<>(HttpStatus.OK);
  }

  @GetMapping(path = "/milestone-notification/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<MilestoneNotification> feed() {
    return this.umGateway.findAll();
  }

}

Here are the logs :

 - 2020.02.06 13:53:04.900 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO  o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","NotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid    , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997184878, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR   , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13530511, id=5d277be2-49f5-3e5d-8916-5793db3b76e7, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e03738521, timestamp=1580997184900}]
 - 2020.02.06 13:53:04.968 [qtp2132762784-23] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
 - 2020.02.06 13:53:53.521 [qtp2132762784-18] INFO  c.j.w.p.u.RequestLoggingInterceptor - Received GET request to /common/dataservice/di/milestone/um/milestone-notification/stream with query=[null] and http-user=[null]
 - 2020.02.06 13:54:09.070 [qtp2132762784-16] INFO  c.j.w.p.u.RequestLoggingInterceptor - Received POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
 - 2020.02.06 13:54:09.541 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO  o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","diNotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid    , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997249519, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR   , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13540975, id=5421898e-5ef6-1f9b-aaa6-81ebc7668f50, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e08738521, timestamp=1580997249541}]
 - 2020.02.06 13:54:09.593 [qtp2132762784-16] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]

Flux stream on browser

Community
  • 1
  • 1
mpai1391
  • 11
  • 2
  • Consider to have a `Flux` instead of that `MilestoneNotification` POJO. – Artem Bilan Feb 06 '20 at 18:34
  • Thanks @ArtemBilan for your response. I tried with the way which you have suggested, but still I am not getting any response on my flux stream. Could it be because My flux stream in on another thread and the JMS listener is on another tread? : thread for Flux stream : qtp2132762784-18 thread for JMS Listener : jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1 ? – mpai1391 Feb 07 '20 at 06:53

1 Answers1

1

So, you can't just make an URL request from the browser when you need a Server Side Events.

You need some particular JavaScript API to use from your web page: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

I could make it to test with a curl -N tool: cURL - Structuring request to validate server sent events

Or using a WebTestClient form Spring WebFlux:

Flux<String> stream =
            this.webTestClient.get().uri("/stream")
                    .exchange()
                    .returnResult(String.class)
                    .getResponseBody();

    StepVerifier
            .create(stream)
            .expectNext("m1", "m2", "m3")
            .thenCancel()
            .verify();
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118