3

I have a SpringBoot application that also uses SpringIntegration to dynamically create web sockets upon requests that come from an Angular web application and send messages on that web sockets that are taken from a BlockingQueue. I want those messages to be pushed on the web socket only when a message is available on the BlockingQueue, so without polling. At the moment, my code looks like below, but it is very inefficient:

@Service
public class WebSocketPublisherService {

    @Autowired
    private IntegrationFlowContext integrationFlowContext;

    @Bean
    public HandshakeHandler handshakeHandler() {
        return new DefaultHandshakeHandler(new TomcatRequestUpgradeStrategy());
    }

    public void createWebSocket() {
        ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer("/test")
            .setHandshakeHandler(handshakeHandler())
            .setAllowedOrigins("http://localhost:4200");
        serverWebSocketContainer.setMessageListener(...); // I want messages comming from the web application to be processed
        WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
        MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
        methodInvokingMessageSource.setObject(...);
        methodInvokingMessageSource.setMethodName(...);   // this method returns the element from the BlockingQueue, if it exists
        StandardIntegrationFlow standardIntegrationFlow = IntegrationFlow
            .from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10)))
            .split(new DecorateMessageWithSessionId(serverWebSocketContainer))
            .handle(webSocketOutboundMessageHandler)
            .get();
        IntegrationFlowContext.IntegrationFlowRegistration flowRegistration = integrationFlowContext
            .registration(standardIntegrationFlow)
            .addBean(serverWebSocketContainer)
            .register();
        flowRegistration.start();
    }
}

Is there a smarter way of creating an IntegrationFlow using the content of a BlockingQueue (based on subscription and not polling)?

  • Sorry, not clear what you are asking. "subscription" on what? There is no subscription on queues, so it is doesn't compile in my mind how one would take messages from the queue without pulling it. See Spring Integration WebSocket sample for some ideas how to publish the same message to all subscribed sessions: https://github.com/spring-projects/spring-integration-samples/tree/main/basic/web-sockets. And yes: it is unclear why do you create WebSocket server in a dynamic flow? – Artem Bilan Jan 03 '23 at 17:29
  • The flow is as below: 1. a client connects to the server via HTTP and requests a new websocket; 2. the server dynamically creates a web socket and responds with the address of the websocket; 3. the client starts communicating via the websocket. When it receives the request from the client, the server also connects to a third-party device that publishes information into a blocking queue (but it might be any other data structure). I want to publish this information further on the web socket only when there is information available from this third-party device, based on a subscription. – Andrei Roșu-Cojocaru Jan 03 '23 at 18:02
  • There will be multiple clients with their own websocket each, receiving information from different devices. So the server needs to publish different information on each websocket, when it becomes available. The client will also communicate with the server via the websocket. – Andrei Roșu-Cojocaru Jan 03 '23 at 18:07
  • No. You said it yourself: the client connects via HTTP. So, that's not only a websocket communication. My concern why don't have a single websocket server and let those clients to subscribe to their own STOMP destinations or so? Probably that's what you mean with that "subscription" - kinda some client-specific information. This sounds more like JMS topic and its `selector` feature. There is no such a functionality for in-memory Java structure. You may look into an embedded JMS broker to gain a topic selector power for your application. Or you just go via STOMP routing feature via `sendToUser` – Artem Bilan Jan 03 '23 at 18:18

0 Answers0