2

I have an RXJS subject which, when I emit to it (with next), makes an HTTP call and timesout after 5 secs. I've turned the backend off so that it ALWAYS timesout. The timeout causes the subscribes error func to be called. Perfect.

However, when I emit to the subject a second time, i see that the subject has 0 observers. The timeout error undesirably removes all current observers to the RXJS subject. However, I do not want this behaviour. I want all observers to remain subscribed.

How do I fix this?

The important line of code is ...

console.log(this.getDocumentsSubject.observers.length);

Which returns 1 when called the 1st time.

BUT Which problematically returns 0 when called the 2nd time, after a timeout.

Full code below.

// RXJS SUBJECT AND ASSOCIATED OBSERVABLE

private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {

let query = elasticFilteredQuery.toString();

// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
  .do(() => {
    console.log('firing post');
  })
  .timeout(Config.http.timeout, new Error('timeout'))
  .map((response: any) => {

    return {
      documents: response.json().hits.hits.map((hit: any) => {
        return this.createDocumentResult(hit);
      }),
      numDocuments: response.json().hits.total,
      elasticFilteredQuery
    };
  });

return restStream;
}).publish().refCount();



// EMIT TO RXJS SUBJECT - this is being called at the correct times

public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
  this.getDocumentsSubject.next(elasticFilteredQuery);
  console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}


// SUBSCRIPTION

this.esQueryService.getDocuments$.subscribe((response: any) => {
    console.log('XXXXX NEXT');
    ...
  }, (err: any) => {
    console.log('XXXXX error');
    ...
  }, () => {
    console.log('XXXXX completed');
  }
);
danday74
  • 52,471
  • 49
  • 232
  • 283

2 Answers2

2

This answer is solely based on the assumption, that you want to use getDocuments$ as a perpetual stream, that emits new data, whenever a a new query comes in. (If this is not the case, then the answer might not help you)

However this will not work like this, because whenever an Error is emitted on a stream, the Stream is essentially dead. (also see this answer)

This is a basic issue in your rxjs-architecture: Errors are supposed to be thrown on one-time-processes (like a rest-call), however data-streams (like documents$) are usually there to ensure that any eventual errors have already been handled, and whatever is emitted (next'ed) on the perpetual stream is reliable & valid data.

So my suggestion would be to use a .catch() to handle the error gracefully, and simple skip the emission of the documents of this call.


Slightly off-topic and maybe not relevant:

In any case it is a very unusual case to have a hard timeout for a rest-call, if you want to save server-power, then I'd suggest handling this on the server-side. Another very common case is, that you might only want to accept responses until the next query is triggered to prevent older, slower queries from showing after a new one was rendered, if this is the case, then you could use a simple .takeUntil(this.getDocumentSubject):

this.http.post(BASE_URL + '_search', query, this.options)
  .takeUntil(this.getDocumentSubject)
  .do(...

As an alternative you could use switchMap instead of the flatMap

Community
  • 1
  • 1
olsn
  • 16,644
  • 6
  • 59
  • 65
  • +1 for your help - if i use a catch block it hits the catch block but im not sure what to return from the catch block. If I ... return Observable.throw(err); ... I have the same problem - any ideas? thx – danday74 Dec 06 '16 at 17:06
  • 1
    you could try an `.empty()` – olsn Dec 06 '16 at 17:07
  • that works, but now my code is not executing any of the NEXT, ERROR or COMPLETED functions - i really need to execute some callback code - any suggestions? thanks v much for your help – danday74 Dec 06 '16 at 17:14
  • NEXT will now only trigger when there is no timeout, if you want to "do" something with the error (display e.g.) you could handle that through a seperate perpetual subject that emits errors-messages via `.next(errorMsg)` – olsn Dec 06 '16 at 17:17
  • ok thanks, not really the answer i was hoping for as RxJS gets messy when you start adding so much stuff. But Im gonna accept your answer anyway and address this in a different question. Many thanks :) – danday74 Dec 06 '16 at 17:19
  • NEXT is now triggered. Fixed this by returning an Observable that emits. See https://github.com/ReactiveX/RxJava/wiki/Creating-Observables. I went for .... return Observable.from(['somethingToEmitToNextBlock']) .... but others worked too - many thanks for helping me arrive at the solution :) – danday74 Dec 07 '16 at 01:43
1

What you describe is RxJS (and all Reactive Extensions) behavior by design. It's not an issue nor bug. This is how it's supposed to work. Any error or complete signal makes recursive unsubscribe() calls. You're definitely not the first one asking this question on SO.

See similar:

Using catch() operator has one important "catch". This operator lets you resubscribe to the same Observable which is something you might not want because it may trigger more HTTP requests, then fail again and make an infinite loop. Be aware that this operator doesn't ignore errors.

Community
  • 1
  • 1
martin
  • 93,354
  • 25
  • 191
  • 226
  • Thanks very much martin, makes total sense now. I notice that catch block has a second caught parameter. If you return that, it resubs also causing the exact infinite loop you describe (works identical to .retry()). I broke out the loop by returning Observable.from(['somethingToEmitToNext']) from the catch block but other choices at https://github.com/ReactiveX/RxJava/wiki/Creating-Observables worked also. Many many thanks for helping me understand this better :) – danday74 Dec 07 '16 at 01:38