2

Short explanation

How to process an array of Observables (for example in forkJoin) with passing some data for each Observable which I need to use in pipe&map?

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => [this.getDataFromApi(source[key]), key])

// resolve only observables[0][0], observables[0][1], observables[0][2] in magic way,
// but preserve  observables[1][0], observables[1][1], observables[1][2] for future use in pipe&map
const processedObservable = forkJoin(observables).pipe( // ???
  map(items => items.map(item => 'this is ' + item[0] + '(' + item[1] + ')')), // this doesn't work
  map(items => items.join(', '))
)

processedObservable.subscribe(text => console.log(text)) // subscribe only for test
// expected result: this is your cat (animal), this is your apple (fruit), this is your blue (color)

Long explanation

I have some "source" (array or object of items). I need to request for every item to API, so I get array of Observables. Next, I want to process all received data, so I use forkJoin and process data in pipe and several maps.

I can't process data in subscribe directly.

Here is the simple example:

const source = ['cat', 'apple', 'blue']
const observables = source.map(item => this.getDataFromApi(item))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => 'this is ' + item)),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: this is your cat, this is your apple, this is your blue

But besides data of items for API requests I have metadata of items which I have to use during processing in pipe & map.

Here is the example with representative source, but here I don't use metadata of items (result is same as above). I ignored metadata:

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => this.getDataFromApi(source[key]))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => 'this is ' + item)),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: this is your cat, this is your apple, this is your blue

Here is the example with representative source, but here I ignored keys and API calls, but I process metadata of items:

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => of(key))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => '(' + item + ')')),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: (animal), (fruit), (color)

I want to get this result:

// result: this is your cat (animal), this is your apple (fruit), this is your blue (color)

In some of this way in pipe&map:

  map(items => items.map(item => 'this is ' + item.apiValue + '(' + item.key + ')')),

or:

  map(items => items.map(item => 'this is ' + item[0] + '(' + item[1] + ')')),

But I don't know how to pass array of observables and metadata to forkJoin, some of this array of observables with metadata:

const observables = Object.keys(source).map(key => [this.getDataFromApi(source[key]), key])

Maybe should I use different function, for example flatMap or switchMap?

Additional info

Method getDataFromApi for simulate API calls:

  getDataFromApi(item) {
    return of('your ' + item)
  }
The Head Rush
  • 3,157
  • 2
  • 25
  • 45
mkczyk
  • 2,460
  • 2
  • 25
  • 40

4 Answers4

1

Below is an example of how it might be implemented:

from(Object.entries(source))
 .pipe(
   mergeMap(([key, value]) => getFromApi(value).pipe(
     map(item => `this is ${item} (${key})`), // <= key is being closured
   )),
   toArray(), // <= transform stream to array 
   map(item => item.join()), 

 )
 .subscribe(console.log);

Try running the following demo:

const { from, of } = rxjs;
const { map, switchMap, toArray } = rxjs.operators;

function getFromApi(item) {
  return of('your ' + item)
}

const source = { animal: "cat", fruit: "apple", color: "blue" };


from(Object.entries(source))
  .pipe(
    mergeMap(([key, value]) => getFromApi(value).pipe(
      map(item => `this is ${item} (${key})`), // <= key is being closured
    )),
    toArray(), // <= transform stream to array 
    map(item => item.join()),
  )
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
Rafi Henig
  • 5,950
  • 2
  • 16
  • 36
  • This doesn't work for asynchronous api calls. `switchMap` will unsubscribe from an api call and subscribe to the next one before the previous one completed. You will see this behaviour if add a delay to the simulated api call `of('your ' + item).pipe(delay(0))`. – frido Sep 08 '20 at 09:53
  • `mergeMap` doesn't guarantee that the merged observable will emit the api responses in the same order as items from the source arrive. If the first api call takes longer than the second one the merged observable will emit the second response before the first response is emitted. So the order of the items in the array collected by `toArray` may be different than the order of the items in the source array. – frido Sep 08 '20 at 14:54
1

You should add your first map directly where you create the individual observables.

const source = { animal: "cat", fruit: "apple", color: "blue" };

const observables = Object.keys(source).map(key => getFromApi(source[key]).pipe(
  map(item => `this is ${item} (${key})`)
));

const processedObservable = forkJoin(observables).pipe(
  map(items => items.join(', '))
);

processedObservable.subscribe(console.log);

https://stackblitz.com/edit/rxjs-4x7hbv?file=index.ts

frido
  • 13,065
  • 5
  • 42
  • 56
0
function getDataFromApi(item) {
  return of('your ' + item)
}

const source = { animal: 'cat', fruit: 'apple', color: 'blue' };

from(Object.keys(source)).pipe(
  switchMap(key => forkJoin([getDataFromApi(source[key]), of(key)])),
  map(([data, key]) => `this is ${data} (${key})`),
  toArray(),
  map(data => data.join(', ')),
).subscribe(console.log);

See: https://stackblitz.com/edit/rxjs-1t4nlo?file=index.ts

MoxxiManagarm
  • 8,735
  • 3
  • 14
  • 43
  • This doesn't work for asynchronous api calls. `switchMap` will unsubscribe from an api call and subscribe to the next one before the previous one completed. You will see this behaviour if add a delay to the simulated api call `of('your ' + item).pipe(delay(0))`. – frido Sep 08 '20 at 09:53
0

Try below using import { combineLatest } from 'rxjs'

 combineLatest(['cat', 'apple', 'blue'].map(item => this.getDataFromApi(item))).pipe(
    map(items => items.map(item => 'this is ' + item)),
    map(items => items.join(', '))
  )
  .subscribe({
    next:(res) => console.log(res)
  })

Example in stackblitz

Owen Kelvin
  • 14,054
  • 10
  • 41
  • 74