2

I'm trying to subscribe to an Observable created from a Subject, and whenever I subscribe without an error handler I get the error this._subscribe is not a function. I found this question that described a similar error; but the answers didn't seem to fit my situation. Here is my code:

const subject = new Rx.Subject();
subject
    .withLatestFrom(otherObservable)
    .subscribe(
        values => {
            // some logic
       }
    );

I also tried:

const subject = new Rx.Subject();
subject
    .withLatestFrom(otherObservable)
    .subscribeOnNext(
        values => {
            // some logic
       }
    ); 

and I get the same error. Here is the stack trace when I try just subscribe:

  Observable.Rx.Observable.observableProto.subscribe.observableProto.forEach (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2034:19)                                                                        
WithLatestFromObservable.subscribeCore (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:4084:33)                                                                                                              
WithLatestFromObservable.tryCatcher (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:63:31)                                                                                                                   
setDisposable [as action] (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2082:46)                                                                                                                           
ScheduledItem.invokeCore (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:896:33)                                                                                                                             
ScheduledItem.invoke (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:884:40)                                                                                                                                 
runTrampoline (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:1125:37)                                                                                                                                       
tryCatcher (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:63:31)                                                                                                                                            
CurrentThreadScheduler.schedule (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:1141:45)                                                                                                                     
WithLatestFromObservable.Rx.ObservableBase.ObservableBase._subscribe (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2095:32)                                                                                
WithLatestFromObservable.Rx.Observable.observableProto.subscribe.observableProto.forEach (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2034:19)  

and this is the stack trace when I try subscribeOnNext

Observable.Rx.Observable.observableProto.subscribe.observableProto.forEach (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2034:19)
    WithLatestFromObservable.subscribeCore (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:4084:33)
    WithLatestFromObservable.tryCatcher (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:63:31)
    setDisposable [as action] (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2082:46)
    ScheduledItem.invokeCore (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:896:33)
    ScheduledItem.invoke (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:884:40)
    runTrampoline (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:1125:37)
    tryCatcher (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:63:31)
    CurrentThreadScheduler.schedule (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:1141:45)
    WithLatestFromObservable.Rx.ObservableBase.ObservableBase._subscribe (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2095:32)
    WithLatestFromObservable.Rx.Observable.observableProto.subscribeOnNext (/home/ryan/code/redurx/node_modules/rx/dist/rx.js:2046:19)

If I pass a trivial error handler like () => null to subscribe I don't get the error. Any ideas?

Community
  • 1
  • 1
Ryan Lynch
  • 7,676
  • 1
  • 24
  • 33

1 Answers1

0

Thanks to user3743222 I figured it out. It turns out (logically actually) that you can't subscribe to a plain observable. It needs to be constructed in a way such that it has values to observe. I changed the initialization for otherObservable in my unit tests from new Rx.Observable() to Rx.Observable.just(1), and now it can be subscribed to. I'm a little confused though why it didn't throw the same error when I passed both an onNext and onError handler. But I'll take it.

Community
  • 1
  • 1
Ryan Lynch
  • 7,676
  • 1
  • 24
  • 33
  • 1
    I dont think `new Rx.Observable()` is part of the API to be honest. Have you checked the documentation? You usually create an observable by deriving it from others through operators, or via Rx.Observable.create, or via a certain number of creation operator, such as Rx.Observable.from, etc. You will find the list of operator for observable creation here : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/categories.md – user3743222 Jun 29 '16 at 18:58
  • Also if you want to understand how observable works, I recommend you to have a look at the following question. It helps figuring out the data and subscription flow happening under the hood. http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444 – user3743222 Jun 29 '16 at 19:01
  • I think you're right about the observable constructor. I understand how to use observables fairly well; I was just trying to trivially construct one for test purposes. – Ryan Lynch Jun 29 '16 at 19:19
  • yep. Use one of the factories like `just` `of` `range` `timer` etc when you want any ol' observable – Brandon Jun 29 '16 at 20:23