0

I have old-style library with 4 asynchronous functions - lets call them libA, libB, libC, libD (the running time of each is unknown/random) - when they finish they call my callback

I write loadString function which return observable (which should contains results of each callback function - I use string here for simplicity but it can be array of results) but it gives wrong result "AADDDDD". The expected result is "AADDDDDCCCCBBB". How to do it right using rxjs?

function loadString() {
  let result = '';
  
  return Observable.create(observer =>{
    libA( n=> { result+='A'.repeat(n) });
    libB( n=> { result+='B'.repeat(n) });
    libC( n=> { result+='C'.repeat(n) });
    libD( n=> {
      result+='D'.repeat(n);
      observer.next(result);
    });        
  })
}

Below there is working snippet which you can copy to your answer and develop loadString function

// SET-UP
const { of, Observable } = rxjs;
const { map, switchMap, delay } = rxjs.operators;

// simulated lib function - not change this (the times are random)
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }


// QUESTION: how to write below function body using rxjs?
function loadString() {
  let result = '';
  
  return Observable.create(observer =>{
    libA( n=> { result+='A'.repeat(n) });
    libB( n=> { result+='B'.repeat(n) });
    libC( n=> { result+='C'.repeat(n) });
    libD( n=> {
      result+='D'.repeat(n);
      observer.next(result);
    });        
  })
}


// TEST
let s = loadString().subscribe(str=> {
   console.log(str); // wrong result: I expected "AADDDDDCCCCBBB" value 
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

UPDAE

I would like to not run lib functions in sequential way but parallel (they send some requests to API...)

Kamil Kiełczewski
  • 85,173
  • 29
  • 368
  • 345

4 Answers4

1

libD is the ONLY thing that calls observer.next. As soon as that finishes, your observable will emit, even if the other ones haven't finished. So you need to wait. Try combineLatest (although that won't preserve order)?

// SET-UP
const { of, Observable, combineLatest } = rxjs;
const { map, switchMap, delay } = rxjs.operators;

// simulated lib function - not change this (the times are random)
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }


// QUESTION: how to write below function body using rxjs?
function loadString() {
  let result = '';
   
   return combineLatest(
     Observable.create(observer => libA(n => observer.next('A'.repeat(n)))),
     Observable.create(observer => libB(n => observer.next('B'.repeat(n)))),   
     Observable.create(observer => libC(n => observer.next('C'.repeat(n)))),          
     Observable.create(observer => libD(n => observer.next('D'.repeat(n))))
   );
 
}


// TEST
let s = loadString().subscribe(str=> {
   console.log(str); // wrong result: I expected "AADDDDDCCCCBBB" value 
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
Adam Jenkins
  • 51,445
  • 11
  • 72
  • 100
  • Thank you for this answer - it help a lot - but we loose order here (I would like to have information which result was first and which was last) - Im rxjs beginner so It is not clear to me how to do it :( – Kamil Kiełczewski Jul 23 '20 at 18:10
1

Call each libX function from the callback of the previous one.

function loadString() {
  let result = '';

  return Observable.create(observer => {
    libA(n => {
      result += 'A'.repeat(n);
      libB(n => {
        result += 'B'.repeat(n);
        libC(n => {
          result += 'C'.repeat(n);
          libD(n => {
            result += 'D'.repeat(n);
            observer.next(result);
          });
        });
      });
    })
  });
}
Barmar
  • 741,623
  • 53
  • 500
  • 612
  • the problem with this approach is theay are run squentially - but in my case lib functions send some request to API - and I would like to run them parallel - but thank you for your answer – Kamil Kiełczewski Jul 23 '20 at 18:02
  • You could convert the observables to promises, then use `Promise.all()` to get the results in order. – Barmar Jul 23 '20 at 18:04
  • I see that you like mix/convert observable to promises but this approach have some drawback: it is less extendable (If you want to use eg. repeat(n) request or cancel it...). So it in general it is not good approach "[NOTE: using toPromise() is an antipattern](https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875)" ~Ben Lesh rxjs author. However in case when you have complicated chains of interdependent requests/tasks - it is better to use [await approach like here](https://stackoverflow.com/q/61912398/860099) - but this is rare case – Kamil Kiełczewski Jul 24 '20 at 09:06
1

A hacked together solution that illustrates your problem - you should only call observer.next after your last callback has been called. Just keep a running count. The below code isn't important, the knowledge of, "just wait until your last one gets called before emitting" is the key.

Here's the hacked together solution for the fun of it:

// SET-UP
const { of, Observable } = rxjs;
const { map, switchMap, delay } = rxjs.operators;

// simulated lib function - not change this (the times are random)
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }


// QUESTION: how to write below function body using rxjs?
function loadString() {
  let result = '';

  const done = (observer,count) => val => {
      result += val;
      // THIS IF STATEMENT IS WHAT YOU WERE MISSING
      if(!--count) return observer.next(result);
  }
  
  return Observable.create(observer =>{
    const complete = done(observer,4);
    libA( n=> complete('A'.repeat(n))),
    libB( n=> complete('B'.repeat(n))),
    libC( n=> complete('C'.repeat(n))),
    libD( n=> complete('D'.repeat(n)));
  })
}


// TEST
let s = loadString().subscribe(str=> {
   console.log(str); // wrong result: I expected "AADDDDDCCCCBBB" value 
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
Adam Jenkins
  • 51,445
  • 11
  • 72
  • 100
0

We can also use bufferCount (technique from here )

// SET-UP
const { Subject } = rxjs;
const { take, bufferCount, map, finalize } = rxjs.operators;

// simulated lib function - not change this (the times are random)
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }


function loadString() {
  let result = new Subject();
  
  libA( n=> { result.next('A'.repeat(n)); });     
  libB( n=> { result.next('B'.repeat(n)); });     
  libC( n=> { result.next('C'.repeat(n)); });     
  libD( n=> { result.next('D'.repeat(n)); });        

  return result.pipe(
    bufferCount(4),       // num of callbacks
    take(1),              // this will unsubscribe after 4 values received
    map( r => r.join`` ),
  );
}


// TEST
let s = loadString().subscribe(str=> {
   console.log(str); // wrong result: I expected "AADDDDDCCCCBBB" value 
}, e=>{}, ()=> console.log('final'))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

or less efficient scan (because it concat strings not once but everytime some task ends)

// SET-UP
const { Subject } = rxjs;
const { scan, take, last, map } = rxjs.operators;

// simulated lib function - not change this (the times are random)
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }


function loadString() {
  let result = new Subject();
  
  libA( n=> { result.next('A'.repeat(n)); });     
  libB( n=> { result.next('B'.repeat(n)); });     
  libC( n=> { result.next('C'.repeat(n)); });     
  libD( n=> { result.next('D'.repeat(n)); });        

  return result.pipe(
      take(4),
      scan((acc, value) => acc + value),
      last()
  );
}


// TEST
let s = loadString().subscribe(str=> {
   console.log(str); // wrong result: I expected "AADDDDDCCCCBBB" value 
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
Kamil Kiełczewski
  • 85,173
  • 29
  • 368
  • 345