I have a Component (Let's call it CA) that creates many Components (CB). Each component CB calls the same function from a service class.
This function creates and returns an Observable. In the callback function of this Observable (let's call it O1), it subscribes to another Observable (O2), which is a member of the service class
Observable O2 push data to observers as long as it receives data from WebSocket connection.
In Observable O1, when it receives data from O2, it push this data to its observers.
My issue here, is that my components CB (which, as I said before, are subscribed to this Observable (O1) don't receive my data sent by O1.
When debugging O1 callback code, I can break on line when O1 receives data from O2. And then, it does an "observer.next(dataReceivedFromO2)". But subscribers (1 in each Component C2) don't receive any data and I don't understand why.
Can anyone help me solve this problem? Thank you in advance for your help
Node version: 8.4.0
Here are my dependencies:
"dependencies": {
"@angular/animations": "5.2.0",
"@angular/cdk": "5.0.4",
"@angular/common": "5.2.0",
"@angular/compiler": "5.2.0",
"@angular/core": "5.2.0",
"@angular/flex-layout": "2.0.0-beta.12",
"@angular/forms": "5.2.0",
"@angular/http": "5.2.0",
"@angular/material": "5.0.4",
"@angular/platform-browser": "5.2.0",
"@angular/platform-browser-dynamic": "5.2.0",
"@angular/router": "5.2.0",
"bootstrap": "4.0.0-beta",
"core-js": "2.5.3",
"font-awesome": "4.7.0",
"hammerjs": "2.0.8",
"material-design-icons": "3.0.1",
"rxjs": "5.5.6",
"typedoc": "0.9.0",
"zone.js": "0.8.20"
},
"devDependencies": {
"@angular/cli": "1.6.4",
"@angular/compiler-cli": "5.2.0",
"@angular/language-service": "5.2.0",
"@types/jasmine": "2.8.3",
"@types/jasminewd2": "2.0.3",
"@types/node": "9.3.0",
"codelyzer": "4.0.2",
"jasmine-core": "2.8.0",
"jasmine-spec-reporter": "4.2.1",
"karma": "2.0.0",
"karma-chrome-launcher": "2.2.0",
"karma-cli": "1.0.1",
"karma-coverage-istanbul-reporter": "1.3.3",
"karma-firefox-launcher": "1.1.0",
"karma-jasmine": "1.1.1",
"karma-jasmine-html-reporter": "0.2.2",
"karma-json-reporter": "1.2.1",
"protractor": "5.2.2",
"ts-node": "4.1.0",
"tslint": "5.9.1",
"typescript": "2.6.2"
}
Here is my service:
Note: Here, O2 is named "liveObservable". FunctiongetLiveResumeData() return an Observable (aka O1 in our Context) to each component that call it.
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { WebsocketService } from './websocket.service';
import { IResumeReadings } from './../types/resume.readings';
@Injectable()
export class ResumeLiveService {
resumeWebSocket: WebSocket;
started = false;
counter = 0;
liveObservable: Observable<IResumeReadings[]>;
constructor(private websocketService: WebsocketService) {
this.openService(this.websocketService);
if (!this.started) {
this.resumeWebSocket.onopen = () => {
this.resumeWebSocket.send('get_resume_data');
this.started = true;
};
}
// Here is my Observable O2
this.liveObservable = Observable.create((observer) => {
// When data comes from WebSocket, data are sent to observer
this.resumeWebSocket.onmessage = (message: any) => {
const bodyAsJson: IResumeReadings[] = JSON.parse(message.data);
// Sending data to observer
observer.next(bodyAsJson);
};
this.resumeWebSocket.onerror = (error) => {
observer.error(error);
};
return () => {
// If all components (C2) have unregistered from their respective Observable,
// the service sends over websocket a message to stop sending data
if (this.started && this.counter === 0) {
this.resumeWebSocket.send('stop_resume_data');
this.started = false;
}
};
});
}
// Function that is called by each Component (C2)
getLiveResumeData(id ?: number): Observable<IResumeReadings[]> {
if (!this.started) {
this.counter = 0;
if (this.resumeWebSocket.readyState === this.resumeWebSocket.OPEN) {
this.resumeWebSocket.send('get_resume_data');
this.started = true;
} else {
this.resumeWebSocket.onopen = () => {
this.resumeWebSocket.send('get_resume_data');
this.started = true;
};
}
}
this.counter += 1;
// We create and return an Observable (O1) which push data to observers when
// it receives data from liveObservable (O2)
return Observable.create((observerTest) => {
const subs = this.liveObservable.subscribe((data) => {
observerTest.next(data);
}, (error) => {
observerTest.error(error);
});
return () => {
subs.unsubscribe();
this.counter -= 1;
};
});
Here is my component (C2)
export class MeasurementCardComponent implements OnInit, OnDestroy {
@Input()
measurementInfo: IMeasurementInfo;
@Input()
formID: string;
data: Observable<IResumeReadings[]>;
alarmStatus: AlarmStatus;
liveResumeSub: Subscription;
constructor(private utilsService: UtilsService,
private resumeLiveService: ResumeLiveService) { }
ngOnInit() {
// Storing Observable (O1) from service
this.data = this.resumeLiveService.getLiveResumeData(this.measurementInfo.id);
// We subscribe to this Observable
this.liveResumeSub = this.data.subscribe(
(readings) => {
// ==============> !!!!!!!!!!! NEVER COMES OR BREAKS HERE :-( !!!!!!!!!!!!!!!!!! <==================
if (readings[0].limits) {
for (const limit of readings[0].limits) {
this.formControls[limit.name] = this.formBuilder.control(limit.value, [Validators.required]);
}
}
const previousAlarmStatus = this.alarmStatus;
this.alarmStatus = this.utilsService.measurementStyleAlarm(readings[0]);
if (this.alarmStatus !== previousAlarmStatus) {
this.setCardAlarmStyle();
}
},
(err: HttpErrorResponse) => {
if (err.error instanceof Error) {
// A client-side or network error occurred.
console.log(`An error occured while getting resume readings for cards: ${err.error.message}`);
} else {
// The backend returned an unsuccessful response code.
// The response body may contain clues as to what went wrong
console.log(`While getting resume readings for cards, Web server returned code ${err.status},` +
`message was: ${err.message}`);
}
},
() => {
console.log('complete');
});
}