I wrote a workaround for this situation. At first I create a listener for websockets events. In case of subscription request I keep the userId from the request and keep in a ConcurrentHashMap. On the other hand when client disconnects or send unsubscribe request I remove his userId from that Map.
My Listener class:
@Service
public class MyEventListener {
@Autowired
private NotificationPublisher notificationPublisher;
@EventListener({SessionSubscribeEvent.class})
public void onWebSocketSubscribeEvent(SessionSubscribeEvent event) {
notificationPublisher.subscribedUsers.put(event.getUser().getName(), Calendar.getInstance().getTimeInMillis());
}
@EventListener({SessionUnsubscribeEvent.class})
public void onWebSocketUnsubscribeEvent(SessionUnsubscribeEvent event) {
notificationPublisher.subscribedUsers.remove(event.getUser().getName());
}
@EventListener({SessionDisconnectEvent.class})
public void onWebSocketDisconnectEvent(SessionDisconnectEvent event) {
notificationPublisher.subscribedUsers.remove(event.getUser().getName());
}
}
Notification publisher class where actual job is running:
public class NotificationPublisher {
public final Map<String, Long> subscribedUsers = new ConcurrentHashMap<>();
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private MyService myService;
@Value("${task.notifications.publisher.websocket_timeout_seconds}")
private int websocketSessionTimeout;
public void sendDataUpdates() {
SocketResponseCount count = null;
for(String key: subscribedUsers.keySet()) {
long subscribeTime = subscribedUsers.get(key);
if(Calendar.getInstance().getTimeInMillis() - subscribeTime > websocketSessionTimeout*1000) {
subscribedUsers.remove(key);
continue;
}
count = myService.getNotificationsCount(key);
this.messagingTemplate.convertAndSendToUser(key, "/queue/publish",count);
}
}
}
Maybe it will help someone