1

I'm trying to create an Rx.Observable from a chain of Promises with RxJS. The difference from this question is that I have unknown number of Promises, and every Promise depends on the result of the previous one.

Basically I have a sequence of pages, connected with "next page" links.

What I want the function to do is:

  • Wait for Promise<>
  • Provide the result (fire observer.onNext())
  • Check if there is a next page link
  • Create next Promise<> with that link
  • Repeat until there are pages remained

I tried the following:

private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {

    let observable = Rx.Observable.create<T>(async obs => {
        let page = await firstPromise;
        page.value.forEach(v => obs.onNext(v));

        while (page['@odata.nextLink']) {
            let nextPageUrl = <string>page['@odata.nextLink'];
            let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
            page = await nextPagePromise;
            page.value.forEach(v => obs.onNext(v));
        }

        obs.onCompleted();
    });

    return observable;
}

(IODataCollectionResult is a OData result, where '@odata.nextLink' is the next page url and .value is an array of values)

The problem is I can't compile that with TypeScript, it gives me an error:

Argument of type '(obs: Observer) => Promise' is not assignable to parameter of type '(observer: Observer) => void | Function | IDisposable'.

Which makes sense, because async function returns a Promise<void>, not a void.

Does it mean I cannot use async/await with the Rx.Observable.create()? How can I chain a sequence of Promises into an Observable?

Community
  • 1
  • 1
enkryptor
  • 1,574
  • 1
  • 17
  • 27

2 Answers2

1

You can wrap the async function in something that voids its results:

function toVoid<A>(fn: A => Any): A => Void {
    return x => void fn(x)
}

(forgive my lacking knowledge of TypeScript, but I hope you can guess what it's supposed to do)

With that, you should be able to call

let observable = Rx.Observable.create<T>(toVoid(async obs => {
    …
}));

But maybe you shouldn't do that. Don't throw away the promise, use it instead to attach the appropriate error handler:

let observable = Rx.Observable.create<T>(obs => {
    (async () => {
        …
    }()).catch(err => {
        obs.onError(err);
    });
});
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
0

The problem was solved using .then() + recursion, without async/await:

private getPages<T>(initialPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
    return Rx.Observable.create<T>(obs => {
        const getPage = (promise: PromiseLike<IODataCollectionResult<T>>) => {
            promise.then(page => {
                page.value.forEach(v => obs.onNext(v));
                if (page['@odata.nextLink']) {
                    let nextPageUrl = <string>page['@odata.nextLink'];
                    let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
                    getPage(nextPagePromise);
                }
                else {
                    obs.onCompleted();
                }
            });
        }
        getPage(initialPromise);
    });
}
enkryptor
  • 1,574
  • 1
  • 17
  • 27