4

Requirement:

urls = [url1, url2, url3]

Fire all 3 urls parallely and paint the Dom in the sequnce of the urls list

 ex: Finished order of urls = [url3, url1, url2]
     when url1 finishes Immediately render the DOM, without waiting for url2
     If url2, url3 finishes before url1, then store url2, url3 and paint the DOM after url1 arrives
     Paint the DOM with order [url1, url2, url3]

My Work using promises:

// Fired all 3 urls at the same time
p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

p1.then(updateDom)
  .then(() => p2)
  .then(updateDom)
  .then(() => p3)
  .then(updateDom)

I wanted to do the same thing in Observables.

from(urls)
  .pipe(
      mergeMap(x => fetch(x))
  )

To fire them parallely I used merge map, but how can I order the sequence of the results?

Pawan Kumar
  • 1,443
  • 2
  • 16
  • 30
  • I just watched this video and it reminded me of this question https://youtu.be/ZdS9uOl4OJk?t=2025 here is the code https://codesandbox.io/s/dreamy-bas-nmw6x?file=/src/orderedMergeMap.ts – Adrian Brand Aug 15 '20 at 04:32

3 Answers3

8

The best way to preserve order with async tasks like this is with concatMap.

The problem is that if we apply this alone, we lose the parallelisation. If we were to do something like this:

from(urls)
  .pipe(
      concatMap(x => fetch(x))
  );

the second request is not fired until the first is complete.

We can get around this by separating out the map into its own operator:

from(urls)
  .pipe(
      map(x => fetch(x)),
      concatMap(x => x)
  );

The requests will all be fired at the same time, but the results will be emitted in request order.

See Adrian's example adapted to use this approach below:

const { from } = rxjs;
const { concatMap, map } = rxjs.operators;

function delayPromise(value, delay) {
  return new Promise(resolve => setTimeout(() => resolve(value), delay));
}

var delay = 3;

from([1, 2, 3]).pipe(
  map(x => delayPromise(x, delay-- * 1000)),
  concatMap(x => x)
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
rh16
  • 1,056
  • 14
  • 24
  • Thanks for the solution. But sorry I can only accept only one answer. – Pawan Kumar Dec 13 '19 at 05:20
  • @pawankumar that's all good, I was just posting it in case other people find this question – rh16 Dec 14 '19 at 06:06
  • Sorry I didn't mean that, I found your answer much better than the accepted one. Wanted to say that in a funny way. – Pawan Kumar Dec 14 '19 at 13:12
  • Yes, this should be the accepted answer. Nice work. – Alexsoyes May 28 '20 at 14:22
  • wow, that is great, I really thought I HAVE to use forkJoin, since it's the only one besides "concatMap" to process in parallel AND keeps the order. The downside of forkjoin is that you have eventually a lot of `null` handling to do that I really want to avoid by just not emitting a value. Great that this here now also works in parralel. Very very interesting. – nils petersohn Jun 27 '21 at 20:50
  • 1
    @nilspetersohn Yeah it was frustration with wrangling the `forkJoin` approach that drove me to create this answer. – rh16 Jun 28 '21 at 01:43
  • but it has to be a promise to work, other observables are run in sequence, promises in parallel. The only working option for me is to wrap the observable in a "getLatestFrom" to convert it to a promise. – nils petersohn Jun 28 '21 at 20:04
  • @nilspetersohn Interesting, I'm not sure what your use case is for wanting to do this with observables that can't be trivially converted to a promise. Maybe that warrants its own question? – rh16 Jun 29 '21 at 02:33
1

I couldn't find anything that preserves the order so I came up with something a bit convoluted.

const { concat, of, BehaviorSubject, Subject } = rxjs;
const { delay, filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}


parallelExecute(
  of(1).pipe(delay(3000)),
  of(2).pipe(delay(2000)),
  of(3).pipe(delay(1000))
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
Adrian Brand
  • 20,384
  • 4
  • 39
  • 60
0

You can form a sequence with fetch and paint then forkJoin/Promise.all them

p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

forkJoin(
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
).subscribe()
Fan Cheung
  • 10,745
  • 3
  • 17
  • 39