1

I'm trying to write a function that will limit the amount of concurrency happening. It looks something like this:


async function* genSleep() {
    yield sleep(1000,'A'); 
    yield sleep(2000,'B');
    yield sleep(1000,'C');
    yield sleep(500,'D');
    yield sleep(1500,'E');
    yield sleep(0,'F');
}

async function main() {
    const t = Date.now();
    concurrent(genSleep(),3,async (letter,idx) => {
        console.log(idx,letter,'start',Date.now()-t);
        await sleep(500);
        console.log(idx,letter,'finish',Date.now()-t);
    })

And the output should be:

0 A start 1000
1 C start 1000
0 A end 1500
1 C end 1500
2 B start 2000
3 D start 2000 // after A finishes, D is queued. It takes 500ms to fire so it starts at 2000
2 B end 2500
3 D end 2500
5 F start 2500
4 E start 3000 // after C finishes at 1500, E is queued. Starts after another 1500ms
5 F end 3000

I think.. I'm having trouble wrapping my head around it, but the point is that the callback should be running at most 3 times in "parallel" (I know JS is single-threaded).

My current implementation is below, but I can't get it quite right. Written in TypeScript but JS answers are fine.



export const sleep = (ms: number,ret:any=ms) => new Promise(r => setTimeout(() => r(ret), ms));

interface PromiseContainer<TValue=unknown,TReason=Error> extends Promise<TValue> {
    resolve(value: TValue): void
    reject(reason: TReason): void
}

function makePromise<TValue=unknown,TReason=Error>() {
    const funcs: any =  {};
    const promise = new Promise((resolve,reject) => {
        funcs.resolve = resolve;
        funcs.reject = reject;
    })
    Object.assign(promise,funcs);
    return promise as PromiseContainer<TValue,TReason>;
}

export async function concurrent<T>(generator: AsyncGenerator<T>, maxConcurrent: number = 10, callback: (item: T, idx: number) => Promise<Error[]>) {
    let pending = new Set<PromiseContainer>();
    const errors: Error[] = [];
    let allDone = false;
    let idx = 0;

    for (;;) {
        const p = makePromise();
        pending.add(p);
        p.finally(() => {
            pending.delete(p);
        })
        console.log('next');
        generator.next().then(({done,value}) => {
            // console.log('generated',value,done);
            if(done) {
                // console.log('doneee')
                p.resolve();
                allDone = true;
            } else {
                callback(value, idx++).catch(err => {
                    errors.push(err);
                }).finally(() => {
                    p.resolve();
                })
            }
        })
        // console.log('pending.size',pending.size)
        if(pending.size >= maxConcurrent) {
            await Promise.race(pending);
        }
        if(allDone) {
            if(pending.size) await Promise.allSettled(pending);
            return errors;
        }
    }
}

if genSleep() is too abstract, more concretely I want to process 3 or so files at a time:

import * as fs from 'fs';
import {promisify} from 'util';
import {join} from 'path';

const readDir = promisify(fs.readdir);
const readFile = promisify(fs.readFile);

export {readFile};

export async function* readDirR(path: string): AsyncGenerator<string> {
    const entries = await readDir(path, {withFileTypes: true});
    for (let entry of entries) {
        const fullPath = join(path, entry.name);
        if (entry.isDirectory()) {
            yield* readDirR(fullPath);
        } else {
            yield fullPath;
        }
    }
}

It takes some time to read the directory, and then a couple seconds to process each file. I would like to start processing the files before the entire directory is read, but I don't want to process more than a few files at a time. The processing is also asynchronous so I can do more than one at a time.

mpen
  • 272,448
  • 266
  • 850
  • 1,236
  • As @georg mentioned, you cannot use an async generator function, those always run sequentially. Also you should avoid the [`Promise` constructor antipattern](https://stackoverflow.com/q/23803743/1048572?What-is-the-promise-construction-antipattern-and-how-to-avoid-it) (`makePromise`) and just use promise chaining on `generator.next()`. Apart from that, I'd say your code looks fine. – Bergi Nov 26 '19 at 00:46

1 Answers1

1

Knowing that AsyncGenerator's won't work, I came up with this:

export async function consume<T>(generator: Generator<Promise<T>>, maxConcurrent: number, callback: (item: T, idx: number) => Promise<void>): Promise<Error[]> {
    const errors: Error[] = [];
    const pending = new Set<Promise<unknown>>();
    let i = 0;

    for (; ;) {
        const {done, value} = generator.next();

        if (done) {
            if (pending.size) await Promise.allSettled(pending);
            return errors;
        }

        const p = value.then((v: T) => callback(v, i++))
            .catch((err: Error) => {
                errors.push(err);
            }).finally(() => {
                pending.delete(p);
            })

        pending.add(p);

        if (pending.size >= maxConcurrent) {
            await Promise.race(pending);
        }
    }
}

Test:

function* genSleep() {
    console.log('queue A')
    yield sleep(1000,'A');

    console.log('queue B')
    yield sleep(2000,'B');

    console.log('queue C')
    yield sleep(1000,'C');

    console.log('queue D')
    yield sleep(500,'D');

    console.log('queue E')
    yield sleep(1500,'E');

    console.log('queue F')
    yield sleep(0,'F');
}

async function main() {
    const t = Date.now();
    consume(genSleep(),3,async (letter,idx) => {
        console.log(idx,letter,'start',Date.now()-t);
        await sleep(500);
        console.log(idx,letter,'finish',Date.now()-t);
    })
    return;
}

main().catch(err => {
    console.error(err);
    process.exit(1);
});

Output:

queue A
queue B
queue C
0 A start 1002
1 C start 1004
0 A finish 1504
queue D
1 C finish 1506
queue E
2 B start 2001
3 D start 2006
2 B finish 2504
queue F
3 D finish 2508
4 F start 2509
5 E start 3007
4 F finish 3011
5 E finish 3510

Which is exactly what I was expecting. Yay!


The async version works fine too actually:

export async function consumeAsync<T>(generator: AsyncGenerator<T>, maxConcurrent: number, callback: (item: T, idx: number) => Promise<void>): Promise<Error[]> {
    const errors: Error[] = [];
    const pending = new Set<Promise<unknown>>();
    let i = 0;

    for (; ;) {
        const {done, value} = await generator.next();

        if (done) {
            if (pending.size) await Promise.allSettled(pending);
            return errors;
        }

        const p = callback(value, i++)
            .catch((err: Error) => {
                errors.push(err);
            }).finally(() => {
                pending.delete(p);
            })

        pending.add(p);

        if (pending.size >= maxConcurrent) {
            await Promise.race(pending);
        }
    }
}
mpen
  • 272,448
  • 266
  • 850
  • 1,236