3

Context: To generate swift code using a JSON tree I’m traversing the tree in pre-order format reaching down into all the leaf nodes, effectively flattening the tree into an array.

Description: Using Rx.Observable.generate() to create an observable stream from the array element, but when filtering I'm getting some weird results. I’ve provided a boiled down example below:

Example:

var Rx = require('/usr/local/lib/node_modules/rx') // 4.0.7

// source 1,3,5,7,9 (WAT)
var source = Rx.Observable.generate(
    0,
    function (x) { return x < 10; },
    function (x) { return x + 1; },
    function (x) { return x; }
)

// filter & merge
var a = source.filter(x => x % 2 == 0)
var b = source.filter(x => x % 2 != 0)
var source = a.merge(b)

// subscribe & output
var subscription = source.subscribe(
    x => console.log(x)
)

Question: Why do I get the output result 1,3,5,7,9 and not 0,1,2,3,4,5,6,7,8,9 as expected?

It doesn’t seem to matter which way I apply the merge… I also get the output 0,2,4,6,8 when reversed.

Edit, npm install & node version user3743222: thanks for your feedback, info on project follows:

$ npm install

chai@3.4.1 node_modules/chai
├── assertion-error@1.0.1
├── type-detect@1.0.0
└── deep-eql@0.1.3 (type-detect@0.1.1)

moment@2.11.1 node_modules/moment

mocha@2.3.4 node_modules/mocha
├── escape-string-regexp@1.0.2
├── diff@1.4.0
├── commander@2.3.0
├── supports-color@1.2.0
├── growl@1.8.1
├── debug@2.2.0 (ms@0.7.1)
├── jade@0.26.3 (commander@0.6.1, mkdirp@0.3.0)
├── mkdirp@0.5.0 (minimist@0.0.8)
└── glob@3.2.3 (inherits@2.0.1, graceful-fs@2.0.3, minimatch@0.2.14)

rx@4.0.7 node_modules/rx


$ node --version

v4.2.1
  • Mike, I run your code and I do get [`0,1,2,3,4,5,6,7,8,9`] in the console. http://jsfiddle.net/ewqpcjum/1/. I am using RxJS v4. Which version are you using? – user3743222 Jan 19 '16 at 10:46
  • By the way I just noticed that you are reassigning `source` in the middle of things. Unless this is intended, you should replace `source=a.merge(b)` with `mergedSource=a.merge(b)` to be able to reason more easily about your program. Just saw as well that you are using 4.07 which is the one I am using too. Is that really the code that gives you the wrong results? – user3743222 Jan 19 '16 at 10:54
  • Thanks for the feedback have updated with more info. Side note: this project is running from node on the command line. – Mike Caisey Jan 19 '16 at 11:26
  • I think the problem is not with your boiled down example with the real one. You might have suppressed the source of errors precisely with this boiling down. The idea is to be able to reproduce the error, so that it can be investigated. I don't see anything wrong neither with your configuration or the code, so if you have a correct code which does not work, post an issue there : https://github.com/Reactive-Extensions/RxJS/issues?q=is%3Aissue+filter+is%3Aopen – user3743222 Jan 19 '16 at 12:57
  • 1
    Ok, update after answer by kakigoori, I was running the jsfiddle with Rxjs 2.2, I runned it with Rxjs4 and I reproduced your erroneous results. See here : http://jsfiddle.net/fz3LL7e5/. You should definitely post the issue in the above link, with the two jsbin links which exhibit both behaviours. This is an issue with `generate` not with `merge` – user3743222 Jan 19 '16 at 14:54
  • Thanks for the feedback. I'm looking at alternatives to creating the stream. from() with an 'arrayLike' object looks promising. – Mike Caisey Jan 19 '16 at 15:29

3 Answers3

2

I think your problem is that generate is meant to create a stateful Observable which will give the first subscriber your generated items, but the next ones will not get anything. See this JSBin

For example, if you use a normal Observable created with range, you will get the merge results you are looking for. See this JSBin

kakigoori
  • 1,218
  • 10
  • 20
  • What happens in your first `jsbin` is that `generate` emits all its values synchronously (immediate scheduler by default), which is why they are all consumed by the time you finish the first `subscribe` in your jsbin. What I don't understand is why the next `subscribe` does not restart the data flow. `generate` returns what behaves like a hot observable, which is certainly documented nowhere. In addition to that in prior version of Rxjs, it was behaving as a cold observable. `range` certainly returns a cold observable, so it works as expected. – user3743222 Jan 19 '16 at 14:56
  • +1 in any case for finding this out. I could not figure it out by myself. Please note though that the dichotomy is more *cold* vs *hot* observables rather than *stateful* and *normal* observables. I don't understand the `generate` source code well enough (they use recursive scheduling which is chinese to me) to know what's going on but the end behavior is indeed rather counterintuitive. – user3743222 Jan 19 '16 at 15:20
0

As mentioned in the comments, RxjsV4 seems to exhibit a different behavior than Rxjsv2 for the generate operator.

Turnarounds :

  • downgrade version
  • use defer to restart the source Cf. http://jsfiddle.net/fz3LL7e5/1/ That will work in this case, but not necessarily in every case.
  • actually I think what you want is to use share because you want your downstream observable to see the same values right? instead of restarting the generation. Cf. http://jsfiddle.net/fz3LL7e5/2/ This works as expected.

Explanation about hot vs. cold here (official documentation) and here (illustrated dataflows)

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
0

I've done some fiddling, trying different ways to create a stream of events

I like using the from array-like option where the object you pass to from() must have a public length property. Not sure if this is because it is indexed.

var E = Rx.Observable.from(arrayLike,
    function (_, i) {
        return i
    })

Shown working here: http://jsfiddle.net/th3caaaz/