1

I have following scenario - 4 callback functions A,B,C,D which are called form old-style like library (which use some API requests inside so execution time is unknown/random but proper order of results (by finished task time) is important for me) - and I want to synchronise data which they return to one obserwable result string using rxjs.

function getData() {

  // --- BELOW Part can be EDIT ---

  let obs = new ReplaySubject(1); // this is example you can use an type

  function A(n) { 
    let r= 'A'.repeat(n);
  }

  function B(n) {
    let r= 'B'.repeat(n);
  }

  function C(n) {
    let r= 'C'.repeat(n);
  }

  function D(n) {
    let r= 'D'.repeat(n);
    obs.next(r);
  }
  
  // --- BELOW Part can NOT be edit ---

  runLib(A,B,C,D)   
  return obs
}

In below snippet value of finalResult is DDDDD which is wrong. Proper value of finalResult string should be AADDDDDCCCCBBB.

// SET-UP - NOT EDIT Below code
const { of, Observable, ReplaySubject } = rxjs;
const { map, switchMap, delay } = rxjs.operators; // example

// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }

function runLib(cA,cB,cC,cD) {
    libA( cA ); libB( cB ); libC( cC ); libD( cD );
}

getData().subscribe(finalResult => {
  console.log(finalResult) // The result is WRONG here!
}, e=>{}, _=> console.log('finished - unsubscribed'));


function getData() {

  // --- BELOW Part can be EDIT ---

  let obs = new ReplaySubject(1); // this is example, you can use diffrend observale kind

  function A(n) { 
    let r= 'A'.repeat(n);
  }

  function B(n) {
    let r= 'B'.repeat(n);
  }

  function C(n) {
    let r= 'C'.repeat(n);
  }

  function D(n) {
    let r= 'D'.repeat(n);
    obs.next(r);
  }
  
  // --- BELOW Part can NOT be edit ---

  runLib(A,B,C,D)   
  return obs
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

In snippet I mark code inside getData() which can be edit in solution (may be it looks little awkward but this is exactly what I need) (you can also find there finalResult but not edit that part of code). It is possible? How to do it?

Kamil Kiełczewski
  • 85,173
  • 29
  • 368
  • 345

2 Answers2

1

The best thing to do in such case is to wrap the library functions to return Observable and then use forkJoin to wait for all the results.

I took your code and modified it to get the desired result, you would need to:

  1. in each callback emit the result to a subject.
  2. return an Observable which wait for n emission - in this case 4
  3. map the emissions into a single string

The final getData function will look like this:

function getData() {
  // --- BELOW Part can be EDIT ---
  const result$: Subject<string> = new Subject<string>();
  const obs = result$.asObservable().pipe(
    bufferCount(4), // or any desired number of callback
    map((results: string[]) => results.join(''))
  );

  function A(n) {
    let r = "A".repeat(n);
    result$.next(r);
  }

  function B(n) {
    let r = "B".repeat(n);
    result$.next(r);
  }

  function C(n) {
    let r = "C".repeat(n);
    result$.next(r);
  }

  function D(n) {
    let r = "D".repeat(n);
    result$.next(r);
  }

  // --- BELOW Part can NOT be edit ---

  runLib(A, B, C, D);
  return obs;
}

You can find the full code in this stackblitz or run below snippet

// SET-UP - NOT EDIT Below code
const { Subject } = rxjs;
const { take, bufferCount, map } = rxjs.operators; // example

// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }

function runLib(cA,cB,cC,cD) {
    libA( cA ); libB( cB ); libC( cC ); libD( cD );
}

getData().subscribe(finalResult => {
  console.log(finalResult) // The result is WRONG here!
}, e=>{},_=> console.log('finished - unsubscribed'));


function getData() {
  // --- BELOW Part can be EDIT ---
  const result$ = new Subject();

  function A(n) {
    let r = "A".repeat(n);
    result$.next(r);
  }

  function B(n) {
    let r = "B".repeat(n);
    result$.next(r);
  }

  function C(n) {
    let r = "C".repeat(n);
    result$.next(r);
  }

  function D(n) {
    let r = "D".repeat(n);
    result$.next(r);
  }

  const obs = result$.pipe(
    bufferCount(4), // or any desired number of callback
    take(1),
    map(results=> results.join``)
  );

  // --- BELOW Part can NOT be edit ---

  runLib(A, B, C, D);
  return obs;
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
Kamil Kiełczewski
  • 85,173
  • 29
  • 368
  • 345
Tal Ohana
  • 1,128
  • 8
  • 15
  • 1
    Thank you - very nice answer - I edit your answer and rewrite snippet to pure JS :) – Kamil Kiełczewski Jul 24 '20 at 13:27
  • @RafiHenig what should we change to unsubscribe it in right time in this solution? – Kamil Kiełczewski Jul 24 '20 at 14:08
  • @RafiHenig actually this is not the problem because I can do unsubscribe after `let subscriber = getData().subscribe` in standard way. So the unsubscription was not the point of my question. – Kamil Kiełczewski Jul 24 '20 at 14:39
  • 1. I can unsubscribe inside subscribe (when the results are ready) 2. If I use `getData().subscribe` in other function then after It done its job, there will be no no reference to used objects inside it - and garbage collector should remove all - I think – Kamil Kiełczewski Jul 24 '20 at 14:47
  • @RafiHenig he can laso use `take` operator - to unsubscribe - I update his answer - and problem is fixed. – Kamil Kiełczewski Jul 24 '20 at 15:04
1

How about the following: (I like @Tal Ohana's answer, but the subject in his solution will never bet unsubscribed which may result in a memory lick)

function getData() {

  let obs = new Subject<string>();

    function A(n: number) {
      let r = 'A'.repeat(n);
      obs.next(r);
    }

    function B(n: number) {
      let r = 'B'.repeat(n);
      obs.next(r);
    }

    function C(n: number) {
      let r = 'C'.repeat(n);
      obs.next(r);
    }

    function D(n: number) {
      let r = 'D'.repeat(n);
      obs.next(r);
    }

    runLib(A, B, C, D)

  return obs.pipe(
      scan((acc, value) => acc + value),
      take(4),
      last()
    )
}
Rafi Henig
  • 5,950
  • 2
  • 16
  • 36
  • can you use snippet from question and create runable example in your answer? (go to answer -> edit (at the bottom) to see full snippet code) – Kamil Kiełczewski Jul 24 '20 at 14:10
  • You say in comment in Tal Ohana answer that he have problem with unsubscribe - I ask you there about give some advice to fix it - do you know how to do it? In your answer you also never unsubscribe obs observable - so probably it is similar problem here... (?) – Kamil Kiełczewski Jul 24 '20 at 14:13
  • I use `take(4)`, – Rafi Henig Jul 24 '20 at 14:14
  • 1
    the subject will get unsubscribed after 4 emissions – Rafi Henig Jul 24 '20 at 14:15
  • `bufferCount ` is more handy than `scan` and produce less calculations (scan is executed for each value and make string concatenation in your answer- buffer only grab values and map do concatenation once) - but thank you for you answer - I'm rxjs beginer and our discussion clear me the view :) – Kamil Kiełczewski Jul 24 '20 at 15:06
  • Good luck! (My opinion is that you should never trust your observable consumers to unsubscribe, and do as best as you could from your part to limit the subscriptions life) in your case you could combine both `take` and `buffercount`) – Rafi Henig Jul 24 '20 at 15:28