I have an RXJS subject which, when I emit to it (with next), makes an HTTP call and timesout after 5 secs. I've turned the backend off so that it ALWAYS timesout. The timeout causes the subscribes error func to be called. Perfect.
However, when I emit to the subject a second time, i see that the subject has 0 observers. The timeout error undesirably removes all current observers to the RXJS subject. However, I do not want this behaviour. I want all observers to remain subscribed.
How do I fix this?
The important line of code is ...
console.log(this.getDocumentsSubject.observers.length);
Which returns 1 when called the 1st time.
BUT Which problematically returns 0 when called the 2nd time, after a timeout.
Full code below.
// RXJS SUBJECT AND ASSOCIATED OBSERVABLE
private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {
let query = elasticFilteredQuery.toString();
// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
.do(() => {
console.log('firing post');
})
.timeout(Config.http.timeout, new Error('timeout'))
.map((response: any) => {
return {
documents: response.json().hits.hits.map((hit: any) => {
return this.createDocumentResult(hit);
}),
numDocuments: response.json().hits.total,
elasticFilteredQuery
};
});
return restStream;
}).publish().refCount();
// EMIT TO RXJS SUBJECT - this is being called at the correct times
public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
this.getDocumentsSubject.next(elasticFilteredQuery);
console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}
// SUBSCRIPTION
this.esQueryService.getDocuments$.subscribe((response: any) => {
console.log('XXXXX NEXT');
...
}, (err: any) => {
console.log('XXXXX error');
...
}, () => {
console.log('XXXXX completed');
}
);