I have a SpringBoot application that also uses SpringIntegration to dynamically create web sockets upon requests that come from an Angular web application and send messages on that web sockets that are taken from a BlockingQueue. I want those messages to be pushed on the web socket only when a message is available on the BlockingQueue, so without polling. At the moment, my code looks like below, but it is very inefficient:
@Service
public class WebSocketPublisherService {
@Autowired
private IntegrationFlowContext integrationFlowContext;
@Bean
public HandshakeHandler handshakeHandler() {
return new DefaultHandshakeHandler(new TomcatRequestUpgradeStrategy());
}
public void createWebSocket() {
ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer("/test")
.setHandshakeHandler(handshakeHandler())
.setAllowedOrigins("http://localhost:4200");
serverWebSocketContainer.setMessageListener(...); // I want messages comming from the web application to be processed
WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(...);
methodInvokingMessageSource.setMethodName(...); // this method returns the element from the BlockingQueue, if it exists
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlow
.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10)))
.split(new DecorateMessageWithSessionId(serverWebSocketContainer))
.handle(webSocketOutboundMessageHandler)
.get();
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration = integrationFlowContext
.registration(standardIntegrationFlow)
.addBean(serverWebSocketContainer)
.register();
flowRegistration.start();
}
}
Is there a smarter way of creating an IntegrationFlow using the content of a BlockingQueue (based on subscription and not polling)?