96

I used to develop a lot with promise and now I am moving to RxJS. The doc of RxJS doesn't provide a very clear example on how to move from promise chain to observer sequence.

For example, I usually write promise chain with multiple steps, like

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

How should I rewrite this promise chain in the RxJS style?

zmf
  • 9,095
  • 2
  • 26
  • 28
Haoliang Yu
  • 2,987
  • 7
  • 22
  • 28

8 Answers8

85

For data flow (equivalent to then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

A promise can be converted into an observable with Rx.Observable.fromPromise.

Some promise operators have a direct translation. For instance RSVP.all, or jQuery.when can be replaced by Rx.Observable.forkJoin.

Keep in mind that you have a bunch of operators that allows to transform data asynchronously, and to perform tasks that you cannot or would be very hard to do with promises. Rxjs reveals all its powers with asynchronous sequences of data (sequence i.e. more than 1 asynchronous value).

For error management, the subject is a little bit more complex.

  • there are catch and finally operators too
  • retryWhen can also help to repeat a sequence in case of error
  • you can also deal with errors in the subscriber itself with the onError function.

For precise semantics, have a deeper look at the documentation and examples you can find on the web, or ask specific questions here.

This would definitely be a good starting point for going deeper in error management with Rxjs : https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

user3743222
  • 18,345
  • 5
  • 69
  • 75
  • I always see the observable sequence ends with subscribe(). As this is only a function of observable object, is there any reason to do this? Is it the function to start the sequence? – Haoliang Yu Dec 30 '15 at 05:48
  • exactly so. If there is no observers passed through subscribe, your observable will not emit any data so you won-t see any data flow. – user3743222 Dec 30 '15 at 05:57
  • 7
    I do recommend you to have a look at this : https://gist.github.com/staltz/868e7e9bc2a7b8c1f754. IT might be more palatable that the official doc. – user3743222 Dec 30 '15 at 05:59
  • Thank you for your explanation and very helpful link :) – Haoliang Yu Dec 30 '15 at 13:57
  • 3
    `Promise.then` is rather `.flatMap` than `.map`. – Tamas Hegedus Nov 28 '16 at 19:55
  • corrected. It escaped me that the `then` were returning promises, hence the `flatMap` – user3743222 Nov 30 '16 at 04:36
  • `flatMap` only works for chaining if your source emits at least one item – felixfbecker May 02 '17 at 13:54
  • The chain set up in place with `flatMap` is independent of whether there are values emitted. It is like saying that a function does not work till it is applied with some parameters. `flatMap` is just a higher order stream function. – user3743222 May 02 '17 at 17:55
  • 1
    FYI this is not exactly equivalent as in the `Promise` version errors from the 3rd `then` would be caught by the `catch`. Here they're not. – mik01aj Jul 03 '18 at 13:30
  • You forgot to mention, that in the flatMap function, we need to return a new Observable :) – Pikachu Aug 28 '20 at 08:31
  • flatMap is deprecated. Why is there no up to date answers anywhere on the internet?? All I want to do is chain 2 observables the same way you can with Promises (easily) `.then(result => {})` and I can't find a single example or an explanation in plain English. Why is it so complex? This is surely the most basic use case. – JeneralJames Jun 21 '21 at 10:26
41

A more modern alternative:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

Also note that for all this to work, you need to subscribe to this piped Observable somewhere, but I assume it's handled in some other part of the application.

arcseldon
  • 35,523
  • 17
  • 121
  • 125
mik01aj
  • 11,928
  • 15
  • 76
  • 119
  • 1
    I'm very new to RxJS, but given that we're only dealing with an initial stream of _one_ event here, and that `mergeMap()` therefore doesn't actually have anything to _merge_, I believe we could achieve exactly the same thing in this case using `concatMap()` or `switchMap()`. Have I got this correct...? – Dan King Jun 18 '19 at 11:28
13

Update May 2019, using RxJs 6

Agree with the provided answers above, wished to add a concrete example with some toy data & simple promises (with setTimeout) using RxJs v6 to add clarity.

Just update the passed id (currently hard-coded as 1) to something that does not exist to execute the error handling logic too. Importantly, also note the use of of with catchError message.

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

Output Data:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

The key part, is equivalent to the following using plain promise control flow:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });
arcseldon
  • 35,523
  • 17
  • 121
  • 125
1

If I understood correctly, you mean consuming the values, in which case you use sbuscribe i.e.

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Additionally, you can just turn the observable to a promise using toPromise() as shown:

arrObservable.toPromise().then()
David Kabii
  • 642
  • 6
  • 17
0

if getPromise function is in a middle of a stream pipe you should simple wrap it into one of functions mergeMap, switchMap or concatMap (usually mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

if you want to start your stream with getPromise() then wrap it into from function:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);
0

As far as i just found out, if you return a result in a flatMap, it converts it to an Array, even if you returned a string.

But if you return an Observable, that observable can return a string;

Samantha Adrichem
  • 833
  • 1
  • 12
  • 23
0

This is how I did it.

Previously

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

After(ly?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});
Anand Rockzz
  • 6,072
  • 5
  • 64
  • 71
-1

RxJS sequence equivalent to promise.then()?

For example

function getdata1 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    function getdata2 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    getdata1.subscribe((data1: any) => {
        console.log("got data one. get data 2 now");
        getdata2.subscribe((data2: any) => {
            console.log("got data one and two here");
        });
    });
Yogesh Waghmare
  • 941
  • 10
  • 10