I'm trying to write a function that will limit the amount of concurrency happening. It looks something like this:
async function* genSleep() {
yield sleep(1000,'A');
yield sleep(2000,'B');
yield sleep(1000,'C');
yield sleep(500,'D');
yield sleep(1500,'E');
yield sleep(0,'F');
}
async function main() {
const t = Date.now();
concurrent(genSleep(),3,async (letter,idx) => {
console.log(idx,letter,'start',Date.now()-t);
await sleep(500);
console.log(idx,letter,'finish',Date.now()-t);
})
And the output should be:
0 A start 1000
1 C start 1000
0 A end 1500
1 C end 1500
2 B start 2000
3 D start 2000 // after A finishes, D is queued. It takes 500ms to fire so it starts at 2000
2 B end 2500
3 D end 2500
5 F start 2500
4 E start 3000 // after C finishes at 1500, E is queued. Starts after another 1500ms
5 F end 3000
I think.. I'm having trouble wrapping my head around it, but the point is that the callback should be running at most 3 times in "parallel" (I know JS is single-threaded).
My current implementation is below, but I can't get it quite right. Written in TypeScript but JS answers are fine.
export const sleep = (ms: number,ret:any=ms) => new Promise(r => setTimeout(() => r(ret), ms));
interface PromiseContainer<TValue=unknown,TReason=Error> extends Promise<TValue> {
resolve(value: TValue): void
reject(reason: TReason): void
}
function makePromise<TValue=unknown,TReason=Error>() {
const funcs: any = {};
const promise = new Promise((resolve,reject) => {
funcs.resolve = resolve;
funcs.reject = reject;
})
Object.assign(promise,funcs);
return promise as PromiseContainer<TValue,TReason>;
}
export async function concurrent<T>(generator: AsyncGenerator<T>, maxConcurrent: number = 10, callback: (item: T, idx: number) => Promise<Error[]>) {
let pending = new Set<PromiseContainer>();
const errors: Error[] = [];
let allDone = false;
let idx = 0;
for (;;) {
const p = makePromise();
pending.add(p);
p.finally(() => {
pending.delete(p);
})
console.log('next');
generator.next().then(({done,value}) => {
// console.log('generated',value,done);
if(done) {
// console.log('doneee')
p.resolve();
allDone = true;
} else {
callback(value, idx++).catch(err => {
errors.push(err);
}).finally(() => {
p.resolve();
})
}
})
// console.log('pending.size',pending.size)
if(pending.size >= maxConcurrent) {
await Promise.race(pending);
}
if(allDone) {
if(pending.size) await Promise.allSettled(pending);
return errors;
}
}
}
if genSleep()
is too abstract, more concretely I want to process 3 or so files at a time:
import * as fs from 'fs';
import {promisify} from 'util';
import {join} from 'path';
const readDir = promisify(fs.readdir);
const readFile = promisify(fs.readFile);
export {readFile};
export async function* readDirR(path: string): AsyncGenerator<string> {
const entries = await readDir(path, {withFileTypes: true});
for (let entry of entries) {
const fullPath = join(path, entry.name);
if (entry.isDirectory()) {
yield* readDirR(fullPath);
} else {
yield fullPath;
}
}
}
It takes some time to read the directory, and then a couple seconds to process each file. I would like to start processing the files before the entire directory is read, but I don't want to process more than a few files at a time. The processing is also asynchronous so I can do more than one at a time.