I'm trying to create an Rx.Observable
from a chain of Promises with RxJS. The difference from this question is that I have unknown number of Promises, and every Promise depends on the result of the previous one.
Basically I have a sequence of pages, connected with "next page" links.
What I want the function to do is:
- Wait for Promise<>
- Provide the result (fire observer.onNext())
- Check if there is a next page link
- Create next Promise<> with that link
- Repeat until there are pages remained
I tried the following:
private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
let observable = Rx.Observable.create<T>(async obs => {
let page = await firstPromise;
page.value.forEach(v => obs.onNext(v));
while (page['@odata.nextLink']) {
let nextPageUrl = <string>page['@odata.nextLink'];
let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
page = await nextPagePromise;
page.value.forEach(v => obs.onNext(v));
}
obs.onCompleted();
});
return observable;
}
(IODataCollectionResult
is a OData result, where '@odata.nextLink' is the next page url and .value is an array of values)
The problem is I can't compile that with TypeScript, it gives me an error:
Argument of type '(obs: Observer) => Promise' is not assignable to parameter of type '(observer: Observer) => void | Function | IDisposable'.
Which makes sense, because async function returns a Promise<void>
, not a void
.
Does it mean I cannot use async/await with the Rx.Observable.create()? How can I chain a sequence of Promises into an Observable?