13

I'm new to RxJS and FRP in general. I had the idea of converting an existing promise chain in my ExpressJS application to be an observable for practice. I am aware that this probably isn't the best example but maybe someone can help shed some light.

What I'm trying to do:

  1. I have two promises - prom1 and prom2
  2. I want prom1 to run before prom2
  3. If prom1 sends a reject(err), I want to cancel prom2 before it starts.
  4. I want the error message prom1 returns to be available to the onError method on the observer.

var prom1 = new Promise(function(resolve, reject) {
    if (true) {
       reject('reason');
    }
    resolve(true);
});

var prom2 = new Promise(function(resolve, reject) {
    resolve(true);
});

// What do I do here? This is what I've tried so far...
var source1 = Rx.Observable.fromPromise(prom1);
var source2 = source1.flatMap(Rx.Observable.fromPromise(prom2));

var subscription = source2.subscribe(
    function (result) { console.log('Next: ' + result); },

    // I want my error 'reason' to be made available here
    function (err) { console.log('Error: ' + err); },

    function () { console.log('Completed'); });
Michael Allan Jackson
  • 4,217
  • 3
  • 35
  • 45
Pathsofdesign
  • 4,678
  • 5
  • 18
  • 26
  • Promises are not "run", they cannot be "started". What do you mean? – Bergi Apr 29 '15 at 14:32
  • What promise library are you using, native promises? How does it support cancellation? – Bergi Apr 29 '15 at 14:34
  • @Bergi - I'm using https://github.com/then/promise. When I say "run" or "started" I guess it's my way of saying when the observable is created and broadcasts to the subscriber. – Pathsofdesign Apr 29 '15 at 14:35
  • `flatMap` for sure does (just as `then`) take a *callback*, not an observable (or promise respectively). – Bergi Apr 29 '15 at 14:35
  • @Bergi - It looks like flatMap() does accept an observable. http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/flatmap.html. Am I missing something? I'm not saying flatMap() is the answer it's just the only thing I know to try. That's why I need help. – Pathsofdesign Apr 29 '15 at 15:06
  • Ah, I see, didn't know that. I guess you should use a callback nonetheless. – Bergi Apr 29 '15 at 15:19

3 Answers3

19

If I understood what you are trying to do - you need to create two deferred observables from functions that return promises and concat them:

var shouldFail = false;

function action1() {
    return new Promise(function (resolve, reject) {    
        console.log('start action1');
        if (shouldFail) {
            reject('reason');
        }
        resolve(true);
    });
}

function action2() {
    return new Promise(function (resolve, reject) {    
        console.log('start action2');
        resolve(true);
    });
}

var source1 = Rx.Observable.defer(action1);
var source2 = Rx.Observable.defer(action2);

var combination = Rx.Observable.concat(source1, source2);

var logObserver = Rx.Observer.create(

function (result) {
    console.log('Next: ' + result);
},

function (err) {
    console.log('Error: ' + err);
},

function () {
    console.log('Completed');
});

then for normal case:

combination.subscribe(logObserver);
// start action1
// Next: true
// start action2
// Next: true
// Completed

And case where fisrt promise fails:

shouldFail = true;
combination.subscribe(logObserver);
// start action1
// Error: reason

http://jsfiddle.net/cL37tgva/

Bogdan Savluk
  • 6,274
  • 1
  • 30
  • 36
  • Thanks for this! I'm finding I don't need to use the the defer() bit but can pass the two promise functions in to concat() and get the results I need. Am I missing anything by doing it this way? – Pathsofdesign Apr 29 '15 at 18:10
  • 2
    In that case both promises would be created(started) immediately(in point where you call concat, but not when you are actually subscribing) and second promise will not wait first one to resolve before start. – Bogdan Savluk Apr 29 '15 at 18:23
  • With the defer()s, what if the second promise returns before the first? Is this where concat() keeps them in order? – Pathsofdesign Apr 29 '15 at 22:59
  • It looks like concat(a,b,c) is smart enough to run things synchronously. Even if b is finished first, it still runs in order a, b, c. – Pathsofdesign Apr 29 '15 at 23:36
5

flatMap turns an Observable of Observables into an Observable. It's used in many examples with Promises because often you have an observable and in the map function you want to create a promise for each "item" the observable emmits. Because every fromPromise call creates a new Observable, that makes it an "observable of observables". flatMap reduces that to a "flat" observable.

In your example you do something different, you turn a single promise into an observable and want to chain it with another observable (also created form a single promise). Concat does what you are looking for, it chains two observables together.

The error case will work as you would expect.

Daniel Bachler
  • 139
  • 1
  • 5
-1

Observable.forkJoin works great here receiving array of other Observables.

Rx.Observable.forkJoin([this.http.get('http://jsonplaceholder.typicode.com/posts'), this.http.get('http://jsonplaceholder.typicode.com/albums')]).subscribe((data) => {
      console.log(data);
    });
ekad
  • 14,436
  • 26
  • 44
  • 46
Nick Shulzhenko
  • 145
  • 1
  • 10
  • forkJoin might not be the right choice here, because forkJoin allows all its observables to run in parallel. The OP wanted the observables to run sequentially. – piccy Apr 06 '20 at 14:13