1

I got a load-test setup of spring websocket server (based on Jetty and spring version 4.3.2.RELEASE) and client, that generates many connections (based on spring's sample java websocket client). The code below sends data to given websocket session: the snippet exploits the case where sessionId can be used instead of User ID (Spring WebSocket @SendToSession: send message to specific session). I may execute this code very often, every 2-3 milliseconds. I use SimpleMessageBroker.

 public void publishToSessionUsingTopic(String sessionId, String subscriptionTopic, Map<String, CacheRowModel> payload) {

        String subscriptionTopicWithoutUser = subscriptionTopic.replace(USER_ENDPOINT, "");
        // necessary message headers for per-session send
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);          
        simpMessagingTemplate.convertAndSendToUser(sessionId, subscriptionTopicWithoutUser, Collections.singletonList(payload), headerAccessor.getMessageHeaders());

}

When this code is executed very frequently (every 2-3 milliseconds) for ~100 sessions, while I see in my logs that it was run and called the convertAndSendToUser, some of the sessions won't receive the message. I appreciate any suggestions about how this could be cleared.

Community
  • 1
  • 1
onkami
  • 8,791
  • 17
  • 90
  • 176

1 Answers1

0

Well, I think your problem is with the:

@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
    TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
    ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
    executor.setThreadNamePrefix("clientOutboundChannel-");
    return executor;
}

where it uses this config for the Executor:

protected ThreadPoolTaskExecutor getTaskExecutor() {
    ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
    executor.setCorePoolSize(this.corePoolSize);
    executor.setMaxPoolSize(this.maxPoolSize);
    executor.setKeepAliveSeconds(this.keepAliveSeconds);
    executor.setQueueCapacity(this.queueCapacity);
    executor.setAllowCoreThreadTimeOut(true);
    return executor;
}

See, there is no RejectedExecutionHandler configured. And by default it is like:

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

So, when you have enough many messages and tasks for them exceed the ThreadPool, any extra are just aborted.

To fix the issue you should implement WebSocketMessageBrokerConfigurer and override its configureClientOutboundChannel() to provide some custom taskExecutor(ThreadPoolTaskExecutor taskExecutor) for example with the new ThreadPoolExecutor.CallerRunsPolicy().

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I provided public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor(getCustomTaskExecutor()); } where customExecutor passes CallerRunsPolicy to RejectionHandler (a custom logging version). However I still have missing messages problem and the rejectedExecution in the executor did not hit the breakpoint, like everything was "executed". – onkami Sep 13 '16 at 07:52
  • I added OutgoingChannelInterceptor and for un-sent messages I can not see it triggered. If it is triggered, message is dispatched. So it is something between convertAndSendToUser and the interceptor. – onkami Sep 13 '16 at 12:23
  • Good (or bad ?). Which broker do you use? – Artem Bilan Sep 13 '16 at 12:26
  • SimpleMessageBroker inside the Spring. – onkami Sep 13 '16 at 12:27
  • Not sure what's going on, but would you mind to switch on `DEBUG` for the `org.springframework` category and investigate logs when you think your messages are lost. E.g. I have this in my logs for example: `DEBUG [main][org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler] Broadcasting to 1 sessions.` That might help somehow you to start seeking. – Artem Bilan Sep 13 '16 at 17:45
  • Thank you, I am already on it. Will let know if I find the cause. Is there ANY reason why convertAndSendToUser (or just convertAndSend) would not trigger sending? I am at the moment not very familiar what is the architectural path from convertAndSend to the MessageInterceptors (and futher). Is there some place whenre I can read about it? Due to interfaces and IoC code is largely detached and it is not easy to follow. I would like to go through the code and see if there are any places when message becomes lost? – onkami Sep 13 '16 at 17:56
  • Well, `simpMessagingTemplate` is like `SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());`, where `brokerChannel()` has `SimpleMessageBroker` as subscriber and so on. – Artem Bilan Sep 13 '16 at 17:58
  • Currently I found a possible problem, since SimpleBroker is configured in some other thread, it looks like that it may not be ready yet when I am trying to send message, if timing is unlikely (it needs some milliseconds). I therefore want to check in SubscriptionRegistry if my session is subscribed in Broker already. I made a hack that kinda works - my problem went away!- but I wonder if you could suggest a nicer way to check if particular SessionId was processed in Broker and it is ready to distribute. Please see http://pastebin.com/wLa9ze7c. Maybe I can listen to some event or something? – onkami Sep 14 '16 at 13:45
  • Eh... I have similar hack in my tests: https://github.com/spring-projects/spring-integration/blob/master/spring-integration-websocket/src/test/java/org/springframework/integration/websocket/client/StompIntegrationTests.java#L181, because I wasn't able to find better way to be sure that my session is ready for outcome. Glad to hear that you have figured out that! Would be better if you comment somehow in related JIRA: https://jira.spring.io/browse/SPR-14695 – Artem Bilan Sep 14 '16 at 13:55