4

I have the following code:

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$,
    driverB: (B$) => B$,
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

I expect to get three lines on the console: A, B and C, but I only get the last line. It appears, that driverC only gets the last message, even though B$.tap(console.log) outputs all three ("a", "b", "c");

What is the explanation for this behavior and how do I propagate all three messages to driverC?

Versions:

  • @cycle/core@6.0.3
  • rx@4.1.0
dypsilon
  • 150
  • 6

2 Answers2

1

Explanation of the behaviour

Well actually that's not easy to explain. This is due to how cycle.run wires its cycle. The following code ran in trycicle :

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

and only displays a d D. So it is in fact the last letter which is shown.

Now if you run this :

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b').delay(1))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

you get a a A b B c C d D which is what you expected.

What happens is that run wires the drivers to the sources through subjects, and do that in order. Which order? The order of enumeration of properties in var x in obj, which is not specified and hence cannot be depended on - could be browser-dependent (cf. Does ES6 introduce a well-defined order of enumeration for object properties?)). Now chrome and firefox latest version seem to enumerate properties in definition order for alphanumerical properties but numerical order for numerical properties (following ES2015 spec).

So here, driverA is connected first to sources, it starts the corresponding dataflow. When driverB is connected to sources, same thing. That dataflow is synchronous, because of the way you wrote B$. So when the subscribe i.e. the wiring is made, all the data a b c d flows synchronously from B$ and when the driverC is wired, B$ has already completed. Given that the wiring is made with a replaySubject(1), that wiring gives you the last emitted value before completion which is d.

So here because of the synchronicity, order matters : if B and C were wired first, it would be fine. And out of luck you have an inadequate order of execution.

To convince you of that, your code where I ordered your streams by topological order works as expected :

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverC: C$,
        driverB: B$,
        driverA: Observable.just('a'),
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }})

How do you propagate all three messages

Well either order your sinks in topological order, or remove the synchronicity. I added a delay(1) to have the dataflow continue at the next tick, when driverC is already wired to receive the next values. That probably is the most robust option as topological order might not be always obvious to compute like here, may change with the interlacing of your sources, and relies on browser-dependant object property enumeration(!).

On a separate note, when the synchronicity of the data flow cannot be avoided, you generally handle the connection issues by using publish to wire all the sources first, and then connect so that when the data flows, all sources are already ready to receive it.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
1

Simply add a delay after driverA:

const B$ = sources.driverA.delay(1)

WebpackBin example.

Alternatively, you can evoke concat once with all the observables listed and delay that.

const B$ = Observable.concat(
    sources.driverA,
    Observable.just('b'),
    Observable.just('c'),
    Observable.just('d')
).delay(1);

WebpackBin example #2.

Something to remember, the main function just hooks up the pipes. run turns the water on.

bloodyKnuckles
  • 11,551
  • 3
  • 29
  • 37