170

I think I must be misunderstanding something fundamental, because in my mind this should be the most basic case for an observable, but for the life of my I can't figure out how to do it from the docs.

Basically, I want to be able to do this:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

But I have not been able to find a method like push. I'm using this for a click handler, and I know they have Observable.fromEvent for that, but I'm trying to use it with React and I'd rather be able to simply update the datastream in a callback, instead of using a completely different event handling system. So basically I want this:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

The closest I got was using observer.onNext('foo'), but that didn't seem to actually work and that's called on the observer, which doesn't seem right. The observer should be the thing reacting to the data stream, not changing it, right?

Do I just not understand the observer/observable relationship?

kian
  • 1,449
  • 2
  • 13
  • 21
LiamD
  • 1,958
  • 2
  • 13
  • 13
  • Have a look at this to clarify your idea (The introduction to Reactive Programming you've been missing) : https://gist.github.com/staltz/868e7e9bc2a7b8c1f754. Here too there is a bunch of resources from which you can improve your understanding : https://github.com/Reactive-Extensions/RxJS#resources – user3743222 Oct 25 '15 at 00:02
  • I'd checked out the first, seems like a solid resource. The second one is a great list, on it I found http://aaronstacy.com/writings/reactive-programming-and-mvc/ which helped me discover Rx.Subject, which solves my problem. So thanks! Once I've written a bit more app I'll post my solution, just want to battle test it a bit. – LiamD Oct 25 '15 at 14:52
  • Hehe, thank you very much for asking this question, I was about to ask the very same question with the very same code sample in mind :-) – arturh Dec 01 '16 at 21:28

3 Answers3

175

In RX, Observer and Observable are distinct entities. An observer subscribes to an Observable. An Observable emits items to its observers by calling the observers' methods. If you need to call the observer methods outside the scope of Observable.create() you can use a Subject, which is a proxy that acts as an observer and Observable at the same time.

You can do like this:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

You can find more information about subjects here:

BinaryButterfly
  • 18,137
  • 13
  • 50
  • 91
luisgabriel
  • 2,483
  • 1
  • 15
  • 10
  • 1
    This is actually exactly what I wound up doing! I kept working on it to see if I could find a better way to do what I needed to do, but this is definitely a viable solution. I saw it first in this article: http://aaronstacy.com/writings/reactive-programming-and-mvc/. – LiamD Oct 27 '15 at 04:10
  • 2
    How would one do it if they are not able to use Subjects and must use observables? – Ian Steffy Feb 15 '21 at 10:23
  • 1
    @IanSteffy What if you create a new Observable which is created (via merge) from the old Observable and a new Subject so you can use this Subject to feed the new Observable? – PEZO Dec 10 '21 at 03:51
40

I believe Observable.create() does not take an observer as callback param but an emitter. So if you want to add a new value to your Observable try this instead:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Yes Subject makes it easier, providing Observable and Observer in the same object, but it's not exactly the same, as Subject allows you to subscribe multiple observers to the same observable when an observable only send data to the last subscribed observer, so use it consciously. Here's a JsBin if you want to tinker with it.

Frederik Struck-Schøning
  • 12,981
  • 8
  • 59
  • 68
theFreedomBanana
  • 2,462
  • 2
  • 22
  • 29
  • is possibility of overwriting emitter property is documented somewhere on RxJS manuals? – Tomas Nov 18 '17 at 20:53
  • In this case `emitter` will only `next()` new values for the observer that subscribed the last. A better approach would be to collect all `emitter`s in an array and to iterate through them all and `next` the value on each of them – Eric Gopak Oct 28 '18 at 18:01
  • By what to replace the deprecated call `Observable.create()` then ? I tried a `new Observable(emitter)` but it's not behaving as I expected. https://stackoverflow.com/q/65060800/958373 – Stephane Dec 04 '20 at 15:57
0

var observer = Observable.subscribe(

function(x){

 console.log('next: ' + 

var my_function = function(){

Observable.push('hello')

One of the way to update an observable.