23

I have a ngrx/store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I start the application I receive incoming data flawlessly.

However after a while (updates are coming in quite infrequently) the connection seem to get lost and I don't get anymore incoming data. My code:

The service

import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);
    }

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
        observer.next(evt);
      };
    })
      .map(res => res.data)
      .share();
  }
}

The subscriber

  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {
        console.log(JSON.stringify(ex));
      }
    })
  }
}

What do I need to do to reconnect the web socket when it times out, so the observable continues to emit incoming values?

I had a look at some Q&A here, here and here and it didn't seem to address this question (in a way I could comprehend).

Note: the websocket at wss://notessensei.mybluemix.net/ws/time is live and emits a time stamp once a minute (in case one wants to test that).

Advice is greatly appreciated!

Community
  • 1
  • 1
stwissel
  • 20,110
  • 6
  • 54
  • 101
  • if there is an error in the stream then it "should" be picked up in the "error" argument to `.subscribe`. All your `try ... catch` is doing is picking up any error produced in the block where the stream actually does receive some data. So adding the error handler at least is a good point to start, since it will show if there actually is a termination error. – Neil Lunn May 19 '17 at 02:10
  • Note also since you are hand wrapping the Websocket in an observable, there :"should" also be a `this.websocket.onerror` mapping to `observer.error` in order to pick that up in the subscription. – Neil Lunn May 19 '17 at 02:15
  • Thx for the hints. I'll rework my example and see if I can get on top of it. Disconnect isn't an error, but an event for a socket, so it probably won't bubble up to the observable – stwissel May 19 '17 at 02:16

3 Answers3

37

Actually there now is a WebsocketSubject in rxjs!

 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify({ op: 'hello' }));

It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:

subject.retry().subscribe(...)

See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

that #-navigation is not working in my browser, so search for "webSocket" on that page.

Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

Herman
  • 1,534
  • 10
  • 16
  • How do you import this? – carkod Jun 29 '18 at 22:42
  • 2
    to import this in rxjs 6 use `import {webSocket} from 'rxjs/webSocket'` – carkod Jun 29 '18 at 22:52
  • This method didn't work for me. I had to use the `new WebSocketSubject('ws://localhost:8081')` constructor, and then in the next I had to send `subject.next({event: 'myEvent', data: 'hello'})` the docs explicitly say, in multiple places to "make sure you serialize the object using JSON.stringify" but I could not get this to work. This answer saved by bacon: https://stackoverflow.com/a/50400143/4233452 – mrClean Feb 13 '19 at 15:25
  • 2
    Property 'retry' does not exist on type 'WebSocketSubject'? – QuanDar Jul 05 '21 at 10:25
  • Retry should look like this: subject.pipe(retry()).subscribe(... – moritz.vieli Dec 04 '21 at 08:21
5

For rxjs 6 implementation

import { webSocket } from 'rxjs/webSocket'
import { retry, RetryConfig } from "rxjs/operators";

const retryConfig: RetryConfig = {
  delay: 3000,
};

let subject = webSocket('ws://localhost:8081');
subject.pipe(
   retry(retryConfig) //support auto reconnect
).subscribe(...)
xinthose
  • 3,213
  • 3
  • 40
  • 59
John Paulo Rodriguez
  • 1,280
  • 18
  • 21
1

This might not be the good answer but it's way too much for a comment.

The problem might comes from your service :

listenToTheSocket(): Observable<any> {
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => {
    console.log("WebService Connected to " + this.destination);
  }

  return Observable.create(observer => {
    this.websocket.onmessage = (evt) => {
      observer.next(evt);
    };
  })
  .map(res => res.data)
  .share();
}

Do you think that you go multiple times in your component into the ngOnInit method?
You should try to put a console.log into ngOnInit to be sure.

Because if you do so, in your service you'll override the this.websocket with a new one.

You should try something like that instead :

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {
    if (this.websocket) {
      return this.websocket$;
    }

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log(`WebService Connected to ${this.destination}`);

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
  }
}

The BehaviorSubject will send the last value if it receives an event before you subscribe to it. Plus, as it's a subject, no need to use the share operator.

Community
  • 1
  • 1
maxime1992
  • 22,502
  • 10
  • 80
  • 121
  • Services are singletons, so the ngOnInit should run once only. But I'll check it our – stwissel May 19 '17 at 07:27
  • I know that services are singleton. Sorry If I haven't been clear enough, but I was talking about the `ngOnInit` **in your component** which is calling your service method `listenToTheSocket` – maxime1992 May 19 '17 at 07:39
  • Ah. Got it. Really appreciate your advice here. App component is at the root of component tree. "should" run once only, but worth pointing out and checking. – stwissel May 19 '17 at 07:42