1

I'm using RxJS v6, but the answer could apply to RxJS v5 as well.

I'd like to make it so if I have say 8 values, I would only have 4 actively being processed at a time.

The code I have right now sends in 4 items, then waits till all 4 finish, then sends in the next 4. I'd like it to send in the first 4, then as an observable completes, another one comes in to be processed.

from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    bufferCount(4),
    concatMap(values => (
        from(values)
        .pipe(
            mergeMap(value => (
                timer(1000) // Async stuff would happen here
                .pipe(
                    mapTo(value),
                )
            )),
        )
    )),
)
.subscribe(console.log)
Kevin Ghadyani
  • 6,829
  • 6
  • 44
  • 62

1 Answers1

3

You have to use the second parameter of mergeMap, which is a number that allows to specify the concurrency level you want.

So your code could look like

from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    map(val => of(val)),
    mergeMap(val => val, 4),
)
.subscribe(console.log)

or

of([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    mergeMap(val => val, 4),
)
.subscribe(console.log)

Consider that concatMap is mergeMap with concurrency level set to 1.

Read this SO post for more details on mergeMap.

Picci
  • 16,775
  • 13
  • 70
  • 113
  • I ended up looking up `concatMap`'s source and saw exactly what you said. I didn't know the second arg was optional though. – Kevin Ghadyani Sep 08 '18 at 08:37