2
var state = [];
var operation1 = function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state.push(1, 2);
        setTimeout(resolve, 300, state);
    }));
};
var operation2 = function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state = state.map(x => x * 2);
        setTimeout(resolve, 200, state);
    }));
};
var operation3 = function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state = state.reduce( (prev, next) => prev + next );
        setTimeout(resolve, 100, state);
    }));
};
var operations = [operation1, operation2, operation3];

Given the code above, I am trying to combine operations into one Observable that emits the state of each operation. So the Observable needs to do either one of the following:

  • emits 3 times: [1, 2], [2, 4], 6
  • emits 1 time: [[1, 2], [2, 4], 6]
Liang Zhou
  • 2,055
  • 19
  • 20
  • Do you mean that you only want to do the work to produce the promise when the previous promise in the chain has completed? – GregL Oct 24 '16 at 22:57
  • @GregL yes, that is correct. And I have updated the example to show that. – Liang Zhou Oct 25 '16 at 00:28

2 Answers2

3

You could try (jsbin)

var state = [];
var operation1 = Rx.Observable.defer(function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state.push(1, 2);
        setTimeout(resolve, 300, state);
    }));
});
var operation2 = Rx.Observable.defer(function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state = state.map(x => x * 2);
        setTimeout(resolve, 200, state);
    }));
});
var operation3 = Rx.Observable.defer(function() {
    return Rx.Observable.fromPromise(new Promise((resolve, reject) => {
        state = state.reduce( (prev, next) => prev + next );
        setTimeout(resolve, 100, state);
    }));
});
var operations = Rx.Observable.from([operation1, operation2, operation3]).merge(1);
operations.subscribe(function(x){console.log(x)})

Please check if that does the trick, I will elaborate later on how this works.

user3743222
  • 18,345
  • 5
  • 69
  • 75
0

You can use Rx.Observable.concat but I think this simple case would be easier without rx.java and using Promise.all

let slow = new Promise((resolve) => {
  setTimeout(resolve, 200, 'slow');
});
let instant = new Promise((resolve) => {
  setTimeout(resolve, 0, 'instant');
});
let quick = new Promise((resolve) => {
  setTimeout(resolve, 50, 'quick');
});

var operation1 = function() {
  return Rx.Observable.fromPromise(slow);
}
var operation2 = function() {
  return Rx.Observable.fromPromise(instant);
}
var operation3 = function() {
  return Rx.Observable.fromPromise(quick);
}
var operations = [operation1(), operation2(), operation3()];
var source = Rx.Observable.concat(operations);

var subscription = source.subscribe(
  function(x) {
    console.log('Next: ' + x);
  },
  function(err) {
    console.log('Error: ' + err);
  },
  function(a) {
    console.log('Completed', a);
  });

// Or with promises

operations = [slow, instant, quick]

Promise.all(operations)
.then(console.log.bind(console, "Promise all"))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.min.js"></script>
denixtry
  • 2,928
  • 1
  • 21
  • 19
  • The problem is that I can't call those operations sequentially. The `Promise.resolve()` is just for demo purpose. My real promise is going to resolve with a delay. And I want to make sure the first promise is resolved before the next is called. – Liang Zhou Oct 24 '16 at 22:34
  • Promise.all will resolve everything in the order the promises are in the list. It doesn't matter if there is a delay – denixtry Oct 24 '16 at 22:45
  • The delay as well as the order of which promise gets resolved next matters to me. `Promise.all` will not ensure the first promise in the array to be resolved before it starts resolving next. I have updated my example code to better show why the order and delay matters. – Liang Zhou Oct 25 '16 at 00:26
  • First of all you can use `Rx.Observable.concat` if you really have to. And second you can look here to understand promises better. https://stackoverflow.com/questions/28066429/promise-all-order-of-resolved-values – denixtry Oct 25 '16 at 00:35
  • 1
    @denixtry The promises passed to `Promise.all` resolve in parallel - not in the order passed. – cartant Oct 25 '16 at 00:47
  • @denixtry, I don't think we are talking about the same problem here. And I am not sure if you have seen my updated example before you put the comment above. The order that `Promise.all` is able to ensure is the result order of each promises, for example, if I have 3 promises p1 p2 p3, `Promise.all([p1, p2, p3])` will give me an array `result` that enforce the same result order, so `result[0]` belongs to p1 not p2 etc. And I totally get that. But what it can't ensure is to defer p2 until p1 *gets resolved*. And that is the order I has been talking about. – Liang Zhou Oct 25 '16 at 00:57