25

fromArray Rx wiki on github

coffee> rext = require 'rx'                                                 
coffee> arr = [1..5]                                                 
[ 1, 2, 3, 4, 5 ]                                                    
coffee> obs = rext.Observable.fromArray(arr)                         
{ _subscribe: [Function] }                                           
coffee> obs.subscribe( (x) -> console.log("added value: " + x))      
added value: 1                                                       
added value: 2                                                       
added value: 3                                                       
added value: 4                                                       
added value: 5                                                       
{ isStopped: true,                                                   
  observer:                                                          
   { isStopped: true,                                                
     _onNext: [Function],                                            
     _onError: [Function: defaultError],                             
     _onCompleted: [Function: noop] },                               
  m: { isDisposed: true, current: null } }                           
coffee> arr.push(12)    # expecting "added value: 12"                                              
6                       # instead got new length of array                                              
coffee>          

It really looks like the subscribe function will only fire one time, when it's created. It seems like it's a bit of a misnomer, since I'm really just for-eaching the array instead of observing changes on it. That code is almost exactly the same as what's on the wiki though. So either I'm doing it wrong or the subscribe doesn't work how I expect.

jcollum
  • 43,623
  • 55
  • 191
  • 321

4 Answers4

15

In RxJS what you are looking for is called a Subject. You can push data into it and stream it from there.

Example:

var array = [];
var arraySubject = new Rx.Subject();

var pushToArray = function (item) {
  array.push(item);
  arraySubject.next(item);
}

// Subscribe to the subject to react to changes
arraySubject.subscribe((item) => console.log(item));
Frederik Wordenskjold
  • 10,031
  • 6
  • 38
  • 57
Frederic Leitenberger
  • 1,949
  • 24
  • 32
  • 1
    what is the `array` for in your example? @Frederik – Frederic Leitenberger Oct 26 '17 at 09:05
  • 1
    Thats the actual array that contains the elements. When you push an element into the array, you also call next() on the arraySubject, so you notify any subscribers. You could also build that directly into your own class, of course. – Frederik Wordenskjold Oct 26 '17 at 21:20
  • dude how is this going to work? if you subscribe to the subject, you aren't going to observe the elements already pushed to the array, right? – Alexander Mills Jan 18 '18 at 01:52
  • 2
    The array is completely separate from the arraySubject in this example. All you are doing is pushing new values to your subscriber – Charles Robertson Apr 16 '18 at 12:25
  • 1
    What if array is not populated by you calling pushToArray, but for example by a component provided by a lib? – Fabio Formosa Sep 14 '18 at 13:55
  • 5
    Forgive me if im wrong but doesnt this example basically ignore the array in terms of observables? I mean at no point is the arraySubject ever interacting with the array... you are just forcing a next on the observable every time you do a push to the array. Is this really the proper/only way to do this? – lorless Oct 29 '18 at 09:58
  • well played - making a notifier when there are any changes. :) – Ravi Anand Oct 08 '20 at 07:29
2

I found Rx.Observable.ofObjectChanges(obj) to work just as I expected.

From the documentation page:

Creates an Observable sequence from changes to an object using Object.observe.

Hope it helps.

Pablo
  • 2,540
  • 1
  • 18
  • 26
  • 9
    ofObjectChanges is no longer available in RxJS 5: https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md – kpup Apr 05 '17 at 20:41
2

How about this:

var subject = new Rx.Subject();
//scan example building an array over time
var example = subject.scan((acc, curr) => { 
    return acc.concat(curr);
}
,[]);

//log accumulated values
var subscribe = example.subscribe(val => 
    console.log('Accumulated array:', val)
);

//next values into subject
subject.next(['Joe']);
subject.next(['Jim']);
Charles Robertson
  • 1,760
  • 16
  • 21
1

Observable.fromArray creates an Observable that immediately fires events for each array items, when you add a Subscriber. So, it won't be "watching" the changes to that array.

If you need a "pushable collection", the Bus class in Bacon.js might be what you're looking for. For RxJs there's my little MessageQueue class that has a similar functionality.

raimohanska
  • 3,265
  • 17
  • 28
  • 5
    So it's an Observer that ... doesn't. Who comes up with this stuff? (rhetorical). Thanks for the tip about MessageQueue. – jcollum Jan 23 '13 at 17:22
  • Oh, and then there's FRB (http://documentup.com/montagejs/frb/) that would actually give you observable bindings on any JS object. Or at least any property of an object. Not sure about directly observing an array. – raimohanska Jan 24 '13 at 18:54
  • 2
    Using another library for something that RxJS provides out of the box with "Subjects" is not a good approach from my perspective. – TekTimmy Jun 22 '17 at 08:40