14

I'm trying to wrap my head around async/await, and I have the following code:

class AsyncQueue<T> {
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) {
        if (this.queue.length > this.maxSize) {
            // Block until available
        }

        this.queue.unshift(x)
    }

    async dequeue() {
        if (this.queue.length == 0) {
            // Block until available
        }

        return this.queue.pop()!
    }
}

async function produce<T>(q: AsyncQueue, x: T) {
    await q.enqueue(x)
}

async function consume<T>(q: AsyncQueue): T {
    return await q.dequeue()
}

// Expecting 3 4 in the console
(async () => {
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
})()

My problem, of course, is in the "Block until available" parts of the code. I was expecting to be able to "halt" the execution until something happens (for example, dequeue halts until an enqueue exists, and vice-versa given the available space). I have the feeling I might need to use coroutines for this, but I really wanted to make sure I am just not missing any async/await magic here.

Hugo Sereno Ferreira
  • 8,600
  • 7
  • 46
  • 92
  • 1
    You *don't* want to `block`, that would freeze the script - you should have `enqueue` and `dequeue` `await` promises that resolve once whatever they're waiting on is available. Also, you should call constructors with `()` – CertainPerformance May 17 '18 at 02:39
  • 1
    It seems I am pushing the `async/await` game further and further, and it isn't clear to me how it would all solve down. – Hugo Sereno Ferreira May 17 '18 at 02:42
  • 1
    Have a look at the possible duplicate [How to implement a pseudo blocking async queue in JS/TS?](https://stackoverflow.com/q/47157428/1048572) – Bergi May 17 '18 at 05:56
  • No, you don't need coroutines, you just need the `new Promise` constructor to wait for things that happen externally. You are however *implementing* coroutines, [CSP-style](https://en.wikipedia.org/wiki/Communicating_sequential_processes). – Bergi May 17 '18 at 06:03

1 Answers1

9

17/04/2019 Update: Long story short, there's a bug in the AsyncSemaphore implementation below, that was caught using property-based testing. You can read all about this "tale" here. Here's the fixed version:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    }

    async wait() {
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    }
}

Finally, after considerable effort, and inspired by @Titian answer, I think I solved this. The code is filled with debug messages, but it might serve pedagogical purposes regarding the flow of control:

class AsyncQueue<T> {
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) {
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
            console.debug(`[${localTrace}] Producer Waiting`)
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug(`[${localTrace}] Producer Ready`)
        }

        this.queue.unshift(x)
        console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)

        if (this.enqueuePointer > 0) {
            console.debug(`[${localTrace}] Notify Consumer`)
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        }
    }

    async dequeue() {
        this.trace += 1
        const localTrace = this.trace

        console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
            console.debug(`[${localTrace}] Consumer Waiting`)
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug(`[${localTrace}] Consumer Ready`)
        }

        const x = this.queue.pop()!
        console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)

        if (this.dequeuePointer > 0) {
            console.debug(`[${localTrace}] Notify Producer`)
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        }

        return x
    }
}

Update: Here's a clean version using an AsyncSemaphore, that really encapsulates the way things are usually done using concurrency primitives, but adapted to the asynchronous-CPS-single-threaded-event-loop™ style of JavaScript with async/await. You can see that the logic of AsyncQueue becomes much more intuitive, and the double synchronisation through Promises is delegated to the two semaphores:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    }

    async wait() {
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    }
}

class AsyncQueue<T> {
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) {
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    }

    async enqueue(x: T) {
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    }

    async dequeue() {
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    }
}

Update 2: There seemed to be a subtle bug hidden in the above code, that became evident when trying to use an AsyncQueue of size 0. The semantics do make sense: it is a queue without any buffer, where the publisher always awaits for an consumer to exist. The lines that were preventing it to work were:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

If you look closely, you'll see that dequeue() isn't perfectly symmetric to enqueue(). In fact, if one swaps the order of these two instructions:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

Then all works again; it seems intuitive to me that we signal that there's something interested in dequeuing() before actually waiting for an enqueuing to take place.

I'm still not sure this doesn't reintroduce subtle bugs, without extensive testing. I'll leave this as a challenge ;)

Hugo Sereno Ferreira
  • 8,600
  • 7
  • 46
  • 92
  • 1
    Glad to see you got some use out of the answer. I got caught up with other things and didn't have time to debug it. I deleted the original answer as it was getting down votes, deservedly so. – Titian Cernicova-Dragomir May 18 '18 at 04:58