I built a chat application using the portfolio websocket sample as a guide. I am using spring boot 1.3.3, ActiveMQ, STOMP and the UI is built with KnockoutJs and running on Windows 2012 server. My issue is after appx 1000 connections to the chat server (spring boot) the server stop accepting any more connections. I played with different heartbeat settings and also messages size settings etc to no avail. Did anyone build such a chat / websocket application and is able to achieve more than 1000 concurrent connections? I spent over a week researching, tweaking the code and I also changed the Windows 2012 server connection limit ( seem like 2012 removed TCP connection limit). Any help or pointers will be greatly appreciated.
/**
*
*/
package com.test.chat;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.config.StompBrokerRelayRegistration;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.SockJsServiceRegistration;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
import com.test.chat.application.event.StompConnectEvent;
import com.test.chat.application.event.StompConnectedEvent;
import com.test.chat.application.event.StompDisconnectEvent;
/**
* @author pgobin
*
* https://www.youtube.com/watch?v=mmIza3L64Ic
*
*/
@Configuration
@EnableWebSocketMessageBroker
@ComponentScan("com.test.chat")
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger log = Logger.getLogger(WebSocketConfig.class);
@Value("${StompBrokerRelay.host}")
String StompBrokerRelayHost;
@Value("${StompBrokerRelay.port}")
int StompBrokerRelayPort;
@Value("${MessageBroker.User}")
String brokerUser;
@Value("${MessageBroker.Password}")
String brokerPassword;
@Value("${MessageBrokerStompClient.User}")
String stompClientUser;
@Value("${MessageBrokerStompClient.Password}")
String stompClientPassword;
@Value("${sockjs.setHttpMessageCacheSize}")
int sockjs_setHttpMessageCacheSize;
@Value("${sockjs.setStreamBytesLimit}")
int sockjs_setStreamBytesLimit;
@Value("${sockjs.setDisconnectDelay:20000}")
int sockjs_setDisconnectDelay;
@Value("${sockjs.setHeartbeatTime:30000}")
int sockjs_setHeartbeatTime;
// WebSocketTransport settings
@Value("${WebSocketTransportRegistration.MessageSizeLimit:131072}")
int MessageSizeLimit;
@Value("${WebSocketTransportRegistration.SendTimeLimit:15000}")
int SendTimeLimit;
@Value("${WebSocketTransportRegistration.SendBufferSizeLimit:524288}")
int SendBufferSizeLimit;
// ClientOutboundChannel configs
@Value("${ClientOutboundChannel.corePoolSize:25}")
int ClientOutboundChannelcorePoolSize;
@Value("${ClientOutboundChannel.maxPoolSize:50}")
int ClientOutboundChannelmaxPoolSize;
// ClientInboundChannel configs
@Value("${ClientInboundChannel.corePoolSize:25}")
int ClientInboundChannelcorePoolSize;
@Value("${ClientInboundChannel.maxPoolSize:50}")
int ClientInboundChannelmaxPoolSize;
/****
*
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry)
{
// Destination Prefix - Connect to default in-memory broker
// messageBrokerRegistry.enableSimpleBroker("/topic/", "/queue/");
// connect to AMQ
StompBrokerRelayRegistration broker = messageBrokerRegistry.enableStompBrokerRelay("/queue/", "/topic/");
broker.setRelayHost(StompBrokerRelayHost);
broker.setRelayPort(StompBrokerRelayPort);
broker.setSystemLogin(brokerUser);
broker.setSystemPasscode(brokerPassword);
broker.setClientLogin(stompClientUser);
broker.setClientPasscode(stompClientPassword);
// broker.setVirtualHost(virtualHost)
messageBrokerRegistry.setApplicationDestinationPrefixes("/app");
}
/*****
* https://github.com/rstoyanchev/spring-websocket-test/issues/4
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry stompRegistry)
{
String wsOrigins = AppConfig.getEnv().getProperty("websocket.security.allow.origins", "http://localhost:8080");
log.info("#### ALLOWING MESSAGING ONLY FROM ORIGINS:" + wsOrigins + ". ALL OTHERS WILL BE BLOCKED ####");
String[] cors = StringUtils.split(AppConfig.getEnv().getProperty("websocket.security.allow.origins", "http://localhost:8080"), ",");
// WebSocket URL prefix
SockJsServiceRegistration reg = stompRegistry.addEndpoint("/chat").setAllowedOrigins(cors).withSockJS()
.setStreamBytesLimit(sockjs_setStreamBytesLimit).setDisconnectDelay(sockjs_setDisconnectDelay)
.setHttpMessageCacheSize(sockjs_setHttpMessageCacheSize).setHeartbeatTime(sockjs_setHeartbeatTime).setWebSocketEnabled(true)
.setSupressCors(false);
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer()
{
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
container.setAsyncSendTimeout(5000);
container.setMaxSessionIdleTimeout(600000);
return container;
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration)
{
registration.setMessageSizeLimit(MessageSizeLimit);
registration.setSendTimeLimit(SendTimeLimit);
registration.setSendBufferSizeLimit(SendBufferSizeLimit);
}
/**
* Configure the {@link org.springframework.messaging.MessageChannel} used
* for outgoing messages to WebSocket clients. By default the channel is
* backed by a thread pool of size 1. It is recommended to customize thread
* pool settings for production use.
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration)
{
registration.taskExecutor().corePoolSize(ClientOutboundChannelcorePoolSize).maxPoolSize(ClientOutboundChannelmaxPoolSize);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration)
{
registration.taskExecutor().corePoolSize(ClientInboundChannelcorePoolSize).maxPoolSize(ClientInboundChannelmaxPoolSize);
}
/***
* Intercepts a connect event
*
* @return
*/
@Bean
public StompConnectEvent presenceChannelInterceptorOnConnect(SimpMessagingTemplate messagingTemplate)
{
return new StompConnectEvent(messagingTemplate);
}
/*
* @Bean public StompConnectedEvent
* presenceChannelInterceptorOnConnected(SimpMessagingTemplate
* messagingTemplate) { return new StompConnectedEvent(messagingTemplate); }
*/
@Bean
public StompDisconnectEvent presenceChannelInterceptorOnDisconnect(SimpMessagingTemplate messagingTemplate)
{
return new StompDisconnectEvent(messagingTemplate);
}
}
And my client test code:
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.simp.stomp.ConnectionLostException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
public static void runTest(final long userUid, final int clientNum)
{
//String stompUrl = "ws://localhost:8080/chat";
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(webSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
// stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setMessageConverter(new org.springframework.messaging.converter.MappingJackson2MessageConverter());
stompClient.setTaskScheduler(taskScheduler);
stompClient.setDefaultHeartbeat(new long[] { 0, 0 });
ConsumerStompSessionHandler handler = new ConsumerStompSessionHandler(BROADCAST_MESSAGE_COUNT, connectLatch, subscribeLatch, messageLatch,
disconnectLatch, failure, clientNum);
HashMap<String, Object> params = new HashMap<String, Object>();
params.put("userid", userUid);
WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders();
wsHeaders.add("userid", "" + userUid);
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add("userid", "" + userUid);
stompHeaders.add("channelID", "java-" + System.currentTimeMillis());
stompHeaders.add("platform", "Windows");
stompHeaders.add("clientIP", "10.1.1.1");
// stompClient.connect(stompUrl, handler, params);
stompClient.connect(stompUrl, wsHeaders, stompHeaders, handler, params);
}
private static class ConsumerStompSessionHandler extends StompSessionHandlerAdapter {
private final int expectedMessageCount;
private final CountDownLatch connectLatch;
private final CountDownLatch subscribeLatch;
private final CountDownLatch messageLatch;
private final CountDownLatch disconnectLatch;
private final AtomicReference<Throwable> failure;
private AtomicInteger messageCount = new AtomicInteger(0);
int clientNum = 0;
public ConsumerStompSessionHandler(int expectedMessageCount, CountDownLatch connectLatch, CountDownLatch subscribeLatch,
CountDownLatch messageLatch, CountDownLatch disconnectLatch, AtomicReference<Throwable> failure, int clientNum)
{
this.expectedMessageCount = expectedMessageCount;
this.connectLatch = connectLatch;
this.subscribeLatch = subscribeLatch;
this.messageLatch = messageLatch;
this.disconnectLatch = disconnectLatch;
this.failure = failure;
this.clientNum = clientNum;
}
@Override
public void afterConnected(final StompSession session, StompHeaders connectedHeaders)
{
__ActiveConn = __ActiveConn + 1;
this.connectLatch.countDown();
session.setAutoReceipt(true);
final RequestUserList req = new RequestUserList();
req.setCustomerid(customerID);
String channelID = System.currentTimeMillis() + "";
String subscribeChannel = __SUBSCRIBE_PREDICATE_QUEUE + channelID;
final String sendChannel = __SEND_PREDICATE + "userListOnline";
req.setChannelID(channelID);
// session.send(sendChannel, req);
// System.out.println("Client " + clientNum + " connected");
session.subscribe(subscribeChannel, new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers)
{
System.out.println("Got ResponseH");
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload)
{
System.out.println("Got ResponseA");
/* if (messageCount.incrementAndGet() == expectedMessageCount)
{
messageLatch.countDown();
disconnectLatch.countDown();
session.disconnect();
}*/
}
}).addReceiptTask(new Runnable() {
@Override
public void run()
{
System.out.println("Got Response for client " + clientNum);
//subscribeLatch.countDown();
}
});
// session.send(sendChannel, req);
}
@Override
public void handleTransportError(StompSession session, Throwable exception)
{
__ErrorConn = __ErrorConn + 1;
logger.error("Transport error", exception);
this.failure.set(exception);
if (exception instanceof ConnectionLostException)
{
this.disconnectLatch.countDown();
}
}
@Override
public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex)
{
logger.error("Handling exception", ex);
this.failure.set(ex);
}
@Override
public void handleFrame(StompHeaders headers, Object payload)
{
System.out.println("Got ResponseF");
Exception ex = new Exception(headers.toString());
logger.error("STOMP ERROR frame", ex);
this.failure.set(ex);
}
@Override
public String toString()
{
return "ConsumerStompSessionHandler[messageCount=" + this.messageCount + "]";
}
}
public static void main(String[] args)
{
try
{
int clientCount = 3000;
for (int x = 0; x < clientCount; x++){
runTest(121807, x+1);
}
System.out.println("DONE...");
System.out.println("Live Connections = " + __ActiveConn);
System.out.println("Error Connections = " + __ErrorConn);
/*
for (int x = 0; x < clientCount; x++)
{
final int clientNum = x;
ThreadPoolManager.executorService.execute(new Runnable() {
@Override
public void run()
{
try
{
Thread.sleep(500);
} catch (Exception e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
runTest(121807, clientNum);
}
});
}
*/
System.out.println("DONE..Waiting..");
Thread.sleep(1000000);
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}