1

I have a Component (Let's call it CA) that creates many Components (CB). Each component CB calls the same function from a service class.

This function creates and returns an Observable. In the callback function of this Observable (let's call it O1), it subscribes to another Observable (O2), which is a member of the service class

Observable O2 push data to observers as long as it receives data from WebSocket connection.

In Observable O1, when it receives data from O2, it push this data to its observers.

My issue here, is that my components CB (which, as I said before, are subscribed to this Observable (O1) don't receive my data sent by O1.

When debugging O1 callback code, I can break on line when O1 receives data from O2. And then, it does an "observer.next(dataReceivedFromO2)". But subscribers (1 in each Component C2) don't receive any data and I don't understand why.

Can anyone help me solve this problem? Thank you in advance for your help

Node version: 8.4.0

Here are my dependencies:
  "dependencies": {
    "@angular/animations": "5.2.0",
    "@angular/cdk": "5.0.4",
    "@angular/common": "5.2.0",
    "@angular/compiler": "5.2.0",
    "@angular/core": "5.2.0",
    "@angular/flex-layout": "2.0.0-beta.12",
    "@angular/forms": "5.2.0",
    "@angular/http": "5.2.0",
    "@angular/material": "5.0.4",
    "@angular/platform-browser": "5.2.0",
    "@angular/platform-browser-dynamic": "5.2.0",
    "@angular/router": "5.2.0",
    "bootstrap": "4.0.0-beta",
    "core-js": "2.5.3",
    "font-awesome": "4.7.0",
    "hammerjs": "2.0.8",
    "material-design-icons": "3.0.1",
    "rxjs": "5.5.6",
    "typedoc": "0.9.0",
    "zone.js": "0.8.20"
  },
  "devDependencies": {
    "@angular/cli": "1.6.4",
    "@angular/compiler-cli": "5.2.0",
    "@angular/language-service": "5.2.0",
    "@types/jasmine": "2.8.3",
    "@types/jasminewd2": "2.0.3",
    "@types/node": "9.3.0",
    "codelyzer": "4.0.2",
    "jasmine-core": "2.8.0",
    "jasmine-spec-reporter": "4.2.1",
    "karma": "2.0.0",
    "karma-chrome-launcher": "2.2.0",
    "karma-cli": "1.0.1",
    "karma-coverage-istanbul-reporter": "1.3.3",
    "karma-firefox-launcher": "1.1.0",
    "karma-jasmine": "1.1.1",
    "karma-jasmine-html-reporter": "0.2.2",
    "karma-json-reporter": "1.2.1",
    "protractor": "5.2.2",
    "ts-node": "4.1.0",
    "tslint": "5.9.1",
    "typescript": "2.6.2"
  }

Here is my service:

Note: Here, O2 is named "liveObservable". FunctiongetLiveResumeData() return an Observable (aka O1 in our Context) to each component that call it.

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { WebsocketService } from './websocket.service';

import { IResumeReadings } from './../types/resume.readings';

@Injectable()
export class ResumeLiveService {

  resumeWebSocket: WebSocket;
  started = false;
  counter = 0;
  liveObservable: Observable<IResumeReadings[]>;

  constructor(private websocketService: WebsocketService) {
    this.openService(this.websocketService);

    if (!this.started) {
      this.resumeWebSocket.onopen = () => {
        this.resumeWebSocket.send('get_resume_data');
        this.started = true;
      };
    }

    // Here is my Observable O2
    this.liveObservable = Observable.create((observer) => {
      // When data comes from WebSocket, data are sent to observer
      this.resumeWebSocket.onmessage = (message: any) => {
        const bodyAsJson: IResumeReadings[] = JSON.parse(message.data);
        // Sending data to observer
        observer.next(bodyAsJson);
      };

      this.resumeWebSocket.onerror = (error) => {
        observer.error(error);
      };

      return () => {
        // If all components (C2) have unregistered from their respective Observable,
        // the service sends over websocket a message to stop sending data
        if (this.started && this.counter === 0) {
          this.resumeWebSocket.send('stop_resume_data');
          this.started = false;
        }
      };
    });
  }

  // Function that is called by each Component (C2)
  getLiveResumeData(id ?: number): Observable<IResumeReadings[]> {
    if (!this.started) {
      this.counter = 0;
      if (this.resumeWebSocket.readyState === this.resumeWebSocket.OPEN) {
        this.resumeWebSocket.send('get_resume_data');
        this.started = true;
      } else {
        this.resumeWebSocket.onopen = () => {
          this.resumeWebSocket.send('get_resume_data');
          this.started = true;
        };
      }
    }
    this.counter += 1;

    // We create and return an Observable (O1) which push data to observers when
    // it receives data from liveObservable (O2)
    return Observable.create((observerTest) => {
      const subs = this.liveObservable.subscribe((data) => {
        observerTest.next(data);
      }, (error) => {
        observerTest.error(error);
      });

      return () => {
        subs.unsubscribe();
        this.counter -= 1;
      };
    });

Here is my component (C2)

export class MeasurementCardComponent implements OnInit, OnDestroy {

  @Input()
  measurementInfo: IMeasurementInfo;
  @Input()
  formID: string;
  data: Observable<IResumeReadings[]>;
  alarmStatus: AlarmStatus;
  liveResumeSub: Subscription;
  constructor(private utilsService: UtilsService,
              private resumeLiveService: ResumeLiveService) { }

  ngOnInit() {
    // Storing Observable (O1) from service 
    this.data = this.resumeLiveService.getLiveResumeData(this.measurementInfo.id);
    // We subscribe to this Observable
    this.liveResumeSub = this.data.subscribe(
      (readings) => {
      //  ==============> !!!!!!!!!!!     NEVER COMES OR BREAKS HERE      :-(   !!!!!!!!!!!!!!!!!!   <==================
        if (readings[0].limits) {
          for (const limit of readings[0].limits) {
            this.formControls[limit.name] = this.formBuilder.control(limit.value, [Validators.required]);
          }
        }

        const previousAlarmStatus = this.alarmStatus;
        this.alarmStatus = this.utilsService.measurementStyleAlarm(readings[0]);

        if (this.alarmStatus !== previousAlarmStatus) {
          this.setCardAlarmStyle();
        }
      },
      (err: HttpErrorResponse) => {
        if (err.error instanceof Error) {
           // A client-side or network error occurred.
          console.log(`An error occured while getting resume readings for cards: ${err.error.message}`);
        } else {
          // The backend returned an unsuccessful response code.
          // The response body may contain clues as to what went wrong
          console.log(`While getting resume readings for cards, Web server returned code ${err.status},` +
                      `message was: ${err.message}`);
        }
      },
    () => {
      console.log('complete');
    });
  }
Philoufelin
  • 111
  • 1
  • 8
  • Are you trying to chain the observables? https://stackoverflow.com/questions/37748241/how-to-do-the-chain-sequence-in-rxjs/37748799#37748799 – Peter Horton Jan 16 '18 at 21:33
  • @PeterHorton No really...I finally figure out what was the problem..It was a keyword (async) in my html that makes my code not working... Finally, it's kind false issue... Sorry – Philoufelin Jan 19 '18 at 15:49

0 Answers0