0

I would like to implement chained method calls like

observable
 .pipe(
  filter('foo'),
  add(3)
 )
 .subscribe(subscriber);

In order for this to work, the result of .pipe(...) must provide the method subscribe.

I would like to allow some of the chained method calls (e.g. pipe) to by async. However, this would breaks my chain because the promise returned by pipe does not have a subscribe method:

await observable
 .pipe(
  filter('foo'),
  add(3)
 )
 .subscribe(subscriber);

async pipe(...operators){
  ...
}

=> Uncaught (in promise) TypeError: observable.pipe(...).subscribe is not a function

I could rewrite my main code to

observable
 .pipe(
  filter('foo'),
  add(3)
 ).then(pipeResult=>
  pipeResult.subscribe(subscriber);
 );

However, I find that very ugly to read.

=> Is there a way to apply await for each call in the chain of method calls and not only for the last one?

I would expect something like

awaitEach observable
 .pipe(
  filter('foo'),
  add(3)
 )
 .subscribe(subscriber); 

Edit

  • Related question:

chaining async method calls - javascript

  • With the help of Promises I could transform from synchronous to asynchronous calls:

foo(){
 return new Promise(resolve=>{
   baa().then(arg=>resolve(arg))
 })
} 

However, I need the other direction, something like:

pipe() {
   var result = undefined;
   await asyncCall(()=>{ //await is not allowed here; forces pipe to by async
     result = 5;
   });
   return result;
 }
Stefan
  • 10,010
  • 7
  • 61
  • 117
  • It is doable in rxjs with for example mergeMap operator. You could check out their source code to see how they solved this use case. – matvs Jul 06 '20 at 17:03

1 Answers1

0

As a work around, I extended the resulting promise with a subscribe proxy, calling my actual subscribe method:

pipe(...operators){

    let observable = this;

    let promise = new Promise(async (resolve) =>{
      for (let operator of operators){
       observable = await this._applyOperator(operator, observable);
      }
      resolve(observable);
    });

    promise.subscribe = (subscriber)=>{
      promise.then(resultingObservable =>{
        resultingObservable.subscribe(subscriber);
      })
    };

    return promise;
} 

If you know a better solution, please let me know.

Stefan
  • 10,010
  • 7
  • 61
  • 117