15

My angular application uses a websocket to communicate with the backend.

In my test case I have 2 client components. The Observable timer prints two different client id's as expected.

Each ngOnInit() also prints the id of its client.

NOW for some reason, the subscription of websocketService.observeClient() is called 2 times for each message but this.client.id always prints the value of the second client.

Heres my client component

@Component({
...
})
export class ClientComponent implements OnInit {

  @Input() client: Client;

  constructor(public websocketService: WebsocketService) {
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
  }

  ngOnInit() {

    console.log(this.client.id);
    this.websocketService.observeClient().subscribe(data => {
      console.log('message', this.client.id);
    });

  }

}

And my websocket Service

@Injectable()
export class WebsocketService {

  private observable: Observable<MessageEvent>;
  private observer: Subject<Message>;

  constructor() {

    const socket = new WebSocket('ws://localhost:9091');

    this.observable = Observable.create(
      (observer: Observer<MessageEvent>) => {
        socket.onmessage = observer.next.bind(observer);
        socket.onerror = observer.error.bind(observer);
        socket.onclose = observer.complete.bind(observer);
        return socket.close.bind(socket);
      }
    );

    this.observer = Subject.create({
      next: (data: Message) => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      }
    });

  }

  observeClient(): Observable<MessageEvent> {
    return this.observable;
  }

}

Edit

Ok as far as I have read it has to do with the fact that Observables are unicast objects and I have to use a Subject for that but I don't know how to create the Subject.

n00dl3
  • 21,213
  • 7
  • 66
  • 76
Pascal
  • 2,059
  • 3
  • 31
  • 52
  • Are you sure the problem isn't in your `providers` configuration? From your description it looks like you want each client component to have its own instance of `WebsocketService`. – martin Apr 10 '17 at 16:49
  • No there should be one injected websocketService which is connected to the backend – Pascal Apr 10 '17 at 17:19

4 Answers4

26

As of rxjs 5 you can use the built-in websocket feature which creates the subject for you. It also reconnects when you resubscribe to the stream after an error. Please refer to this answer:

https://stackoverflow.com/a/44067972/552203

TLDR:

 let subject = Observable.webSocket('ws://localhost:8081');
 subject
   .retry()
   .subscribe(
      (msg) => console.log('message received: ' + msg),
      (err) => console.log(err),
      () => console.log('complete')
    );
 subject.next(JSON.stringify({ op: 'hello' }));
Community
  • 1
  • 1
Herman
  • 1,534
  • 10
  • 16
  • it looks best answer to me. i assume calling next will send a message to the broker correct ? my subscriber and message end points are different do i have create two different subjects in order to send and receive ? – d-man May 07 '18 at 21:07
  • also does Observable.webSocket uses stomp end point ? i m using spring and i have register stomp end point. i tried this example but i keep seeing error handshake – d-man May 07 '18 at 21:08
4

Well, I have been suffering with the websockets with angular and python for long. I had some websockets for each component and they didn't close, so every time I pass through one tab to another (changing betweeen components), the websocket was created again in the back and I was receiving twice or more times the same (solution at the end)

There are many tutorials explaining how to do websockets, but not many explaining how to close or how to have several of them.

Here my piece of bread: Best tuto I have found some far: https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac But yet not complete. It does not close properly the WS

Here what I did: (My class WebSocketTestService)

@Injectable()
export class WebSocketTestService {
    public messages: Subject<any>  = new Subject<any>();

    constructor(private wsService: WebSocketService) {
        console.log('constructyor ws synop service')
    }

    public connect() {
        this.messages = <Subject<any>>this.wsService
            .connect('ws://localhost:8000/mytestwsid')
            .map((response: any): any => {
                return JSON.parse(response.data);
            })
    }

    public close() {
        this.wsService.close()
    }
} // end class 

And here (in another file for me) the websocket class

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {Observer} from "rxjs/Observer";

@Injectable()
export class WebSocketService {
    private subject: Subject<MessageEvent>;
    private subjectData: Subject<number>;
  private ws: any;
    // For chat box
    public connect(url: string): Subject<MessageEvent> {
        if (!this.subject) {
            this.subject = this.create(url);
        }
        return this.subject;
    }

    private create(url: string): Subject<MessageEvent> {
        this.ws = new WebSocket(url);

        let observable = Observable.create(
            (obs: Observer<MessageEvent>) => {
                this.ws.onmessage = obs.next.bind(obs);
                this.ws.onerror   = obs.error.bind(obs);
                this.ws.onclose   = obs.complete.bind(obs);

                return this.ws.close.bind(this.ws);
            });

        let observer = {
            next: (data: Object) => {
                if (this.ws.readyState === WebSocket.OPEN) {
                    this.ws.send(JSON.stringify(data));
                }
            }
        };

        return Subject.create(observer, observable);
  }
  
  public close() {
    console.log('on closing WS');
    this.ws.close()
    this.subject = null
  }

} // end class WebSocketService

And finally, my call to the WSTestService in the component

this.webSocketTestService.connect();
this.webSocketTestService.messages.subscribe(message => {
      console.log(message);

})

and in the most important part, the ngOnDestroy()

ngOnDestroy() {
    this.webSocketTestService.messages.unsubscribe();
    this.webSocketTestService.close()

Of course, add both services to the providers section in the app.module

It is the only way I have found to close properly the websocket when I change between views and I destroy components. I'm not really sure if it is your case but it is being hard for me to find an example who works properly with multiple views and websockets. I had similar problems to the ones you asked, so I hope it works for you. Let me know if my approach works for you.

Edit!: Actually, after some time I found that my problem was more related to the services cycle of life rather than the websockets. A ws service injected in root is shared by all the components so if it was reused to create a new websocket, I was overriding the first one as it was the same service. The best way to keep a webscoket for a determinated view is to inject the WS-service in the component that uses

providers: [Your websocket service]

Let me know if you have questions, I'll try to answer them.

Javier
  • 117
  • 1
  • 11
  • thank you this helped me a lot with my rxJs Python implementation! – matyas Aug 05 '18 at 16:08
  • Thanks helped a lot. But what about the case when the same socket connection needs to be used in all the view. – Abhishek Rathore Feb 25 '19 at 11:47
  • If you don't want your websocket to share, the best you can do it is to inject the service only in the component using providers and then your websocket service. In your case, if you set your service injected in root, you should be able to maintain the same websocket connection for all components. – Javier Feb 26 '21 at 09:26
2

You need to use the share operator to share it among the subscribers.

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => {
       socket.onmessage = observer.next.bind(observer);
       socket.onerror = observer.error.bind(observer);
       socket.onclose = observer.complete.bind(observer);
       return socket.close.bind(socket);
    }
).share();

Also make sure this service is a singleton.

Doc: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

eko
  • 39,722
  • 10
  • 72
  • 98
0

I was struggling with the built-in websocket feature (maybe it's just me) so I made my own resynkd. It's a work in progress, but so far so good...

My main goal was to be able to subscribe from either websocket endpoint, and to cover more than just Subject. I can create a BehaviorSubject on the client and subscribe to it on the server (by name...) or I can create a ReplaySubject on the server and subscribe to it on the client, etc.

Predrag Stojadinović
  • 3,439
  • 6
  • 34
  • 52