IMHO, I would try and leverage the power of rxjs since we're using it here already anyway and avoid implementing a custom queuing concept as suggested by another answer (though you certainly can do that).
If we simplify the given case a bit, we just have some observable and want to perform a long-running procedure for each emission – in sequence. rxjs allows doing this by means of the concatMap
operator essentially out of the box:
$data.pipe(concatMap(item => processItem(item))).subscribe();
This only assumes that processItem
returns an observable. Since you used await
, I assume your function(s) currently return Promises. These can be trivially converted into observables using from
.
The only detail left to look at from the OP is that the observable actually emits an array of items and we want to perform the operation on each item of each emission. To do that, we just flatten the observable using mergeMap
.
Let's put it all together. Note that if you take away preparing some stub data and logging, the actual implementation of this is only two lines of code (using mergeMap + concatMap).
const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
// Stub for the long-running operation
function processTask(task) {
console.log("Processing task: ", task);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished task: ", task);
resolve(task);
}, 500 * Math.random() + 300);
});
}
// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));
// Some stubbed data stream
const tasks$ = interval(250).pipe(
take(9),
bufferCount(3),
);
tasks$.pipe(
tap(task => console.log("Received task: ", task)),
// Flatten the tasks array since we want to work in sequence anyway
mergeMap(tasks => tasks),
// Process each task, but do so consecutively
concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>