2

I'm working on simple chat module for my application using Spring WebFlux with ReactiveMongoRepository on backend and Angular 4 on front. I'm able to receive data through WebSocketSession but after streaming all messages from db i want to keep the connection so i could update message list. Can anyone give me clues how to achieve that, or maybe i'm following wrong assumptions ?

Java Backend responsible for WebSocket, my subscriber only logs current state, nothing relevant there:

WebFluxConfiguration:

@Configuration
@EnableWebFlux
public class WebSocketConfig {

private final WebSocketHandler webSocketHandler;

@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
    this.webSocketHandler = webSocketHandler;
}

@Bean
@Primary
public HandlerMapping webSocketMapping() {
    Map<String, Object> map = new HashMap<>();
    map.put("/websocket-messages", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(10);
    mapping.setUrlMap(map);
    return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();
}


}

WebSocketHandler Implementation

@Component
public class MessageWebSocketHandler implements WebSocketHandler {

private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();

@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
}

@Override
    public Mono<Void> handle(WebSocketSession session) {
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toMessage)
            .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
    return session.send(
            messageRepository.findAll()
                    .map(this::toJSON)
                    .map(session::textMessage));
}

private String toJSON(Message message) {
    try {
        return mapper.writeValueAsString(message);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

private Message toMessage(String json) {
    try {
        return mapper.readValue(json, Message.class);
    } catch (IOException e) {
        throw new RuntimeException("Invalid JSON:" + json, e);
    }
}
}

and MongoRepo

@Repository
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> {
}

FrontEnd Handling:

@Injectable()
export class WebSocketService {
  private subject: Rx.Subject<MessageEvent>;

  constructor() {
  }

  public connect(url): Rx.Subject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log('Successfully connected: ' + url);
    }
    return this.subject;
  }

  private create(url): Rx.Subject<MessageEvent> {
    const ws = new WebSocket(url);
    const observable = Rx.Observable.create(
      (obs: Rx.Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });
    const observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify(data));
        }
      }
    };
    return Rx.Subject.create(observer, observable);
  }
}

in other service i'm mapping observable from response to my type

  constructor(private wsService: WebSocketService) {
    this.messages = <Subject<MessageEntity>>this.wsService
      .connect('ws://localhost:8081/websocket-messages')
      .map((response: MessageEvent): MessageEntity => {
        const data = JSON.parse(response.data);
        return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
      });
  }

and finally subscribtion with send function which i can't use because of closed connection:

  ngOnInit() {
    this.messages = [];
    this._ws_subscription = this.chatService.messages.subscribe(
      (message: MessageEntity) => {
        this.messages.push(message);
      },
      error2 => {
        console.log(error2.json());
      },
      () => {
        console.log('Closed');
      }
    );
  }

  sendTestMessage() {
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
  }
MiCkl
  • 63
  • 1
  • 6

1 Answers1

1

Assuming your chat messages are being persisted to the datastore as they're being received, you could use the tailable cursors feature in Spring Data MongoDB Reactive (see reference documentation).

So you could create a new method on your repository like:

public interface MessageRepository extends ReactiveSortingRepository< Message, String> {

    @Tailable
    Flux<Message> findWithTailableCursor();
}

Note that tailable cursors have some limitations: you mongo collection needs to be capped and entries are streamed in their order of insertion.

Spring WebFlux websocket support does not yet support STOMP nor message brokers, but this might be a better choice for such a use case.

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
  • Ok, so basically using just WebFlux it's impossible to keep connection for notification purposes, right? It's a research project for me so that's a bit disappointing thanks for alternative I'll definitely check it out. – MiCkl Nov 23 '17 at 15:07
  • I didn't say that. You can leave the connection opened, as long as the source `Flux` stays opened. In your case, the repository sends an `onComplete` signal when it's done streaming entries. Otherwise, how would you know if there are more results from the database or not? You're here looking for an infinite stream use case, and you need a source that behaves like this. – Brian Clozel Nov 23 '17 at 15:09
  • Oh ok i get it now, i had a look at the documentation and @Tailable annotation seems to be the answer i was looking for. Thanks again now i can move on ;D – MiCkl Nov 23 '17 at 15:16