12

So here's an oxymoron: I want to create an asynchronous blocking queue in javascript/typescript (if you can implement it without typescript, that's fine). Basically I want to implement something like Java's BlockingQueue expect instead of it actually being blocking, it would be async and I can await dequeues.

Here's the interface I want to implement:

interface AsyncBlockingQueue<T> {
  enqueue(t: T): void;
  dequeue(): Promise<T>;
}

And I'd use it like so:

// enqueue stuff somewhere else

async function useBlockingQueue() {
  // as soon as something is enqueued, the promise will be resolved:
  const value = await asyncBlockingQueue.dequeue();
  // this will cause it to await for a second value
  const secondValue = await asyncBlockingQueue.dequeue();
}

Any ideas?

Rico Kahler
  • 17,616
  • 11
  • 59
  • 85

2 Answers2

19

It's quite simple actually, dequeue will create a promise that enqueue will resolve. We just have to keep the resolvers in a queue - and also care about the case where values are enqueued before they are dequeued, keeping the already fulfilled promises in a queue.

class AsyncBlockingQueue {
  constructor() {
    // invariant: at least one of the arrays is empty
    this.resolvers = [];
    this.promises = [];
  }
  _add() {
    this.promises.push(new Promise(resolve => {
      this.resolvers.push(resolve);
    }));
  }
  enqueue(t) {
    // if (this.resolvers.length) this.resolvers.shift()(t);
    // else this.promises.push(Promise.resolve(t));
    if (!this.resolvers.length) this._add();
    this.resolvers.shift()(t);
  }
  dequeue() {
    if (!this.promises.length) this._add();
    return this.promises.shift();
  }
  // now some utilities:
  isEmpty() { // there are no values available
    return !this.promises.length; // this.length <= 0
  }
  isBlocked() { // it's waiting for values
    return !!this.resolvers.length; // this.length < 0
  }
  get length() {
    return this.promises.length - this.resolvers.length;
  }
  [Symbol.asyncIterator]() {
    // Todo: Use AsyncIterator.from()
    return {
      next: () => this.dequeue().then(value => ({done: false, value})),
      [Symbol.asyncIterator]() { return this; },
    };
  }
}

I don't know TypeScript, but presumably it's simple to add the the necessary type annotations.

For better performance, use a Queue implementation with circular buffers instead of plain arrays, e.g. this one. You might also use only a single queue and remember whether you currently store promises or resolvers.

Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • 5
    I'd like to thank stackoverflow for making me add more `!` to reach the character minimum – Rico Kahler Nov 07 '17 at 12:22
  • @Bergi In this solution, why is there the check `if (!this.resolvers.length)` in `enqueue`? The logic appears to encode the creation of a Promise (and a resolver), and then the immediate removal of the resolver. What am I missing? Does the length of the array of resolvers ever get above 1? – Ben Aston Mar 11 '20 at 11:46
  • 1
    @52d6c6af The length of the resolvers goes above 0 when there were more `dequeue()` calls than `enqueue()` calls, the length of the promises goes above 0 when there were more `enqueue()` calls than `dequeue()` calls. – Bergi Mar 11 '20 at 12:03
  • @52d6c6af No, I don't think this has to do anything with [the trampolines](https://en.wikipedia.org/wiki/Trampoline_(computing)) that I know – Bergi Mar 11 '20 at 13:21
  • Thank you for your solution! I've added a `requeue` function that adds a Promise at the head of the array. This way I can do a `NO-ACK` and still maintain the sequence of enqueued items: ` requeue(t) { if (!this.promises.length) { this._add() } else { this.promises.unshift(new Promise(resolve => { this.resolvingFunctions.unshift(resolve) })) } this.resolvingFunctions.shift()(t) } ` – Thomas Halwax Mar 20 '23 at 13:51
10

This is simply @Bergi's answer but with typescript + generics with some modifications to make it work with strict mode for my typescript peeps out there.

class AsyncBlockingQueue<T> {
  private _promises: Promise<T>[];
  private _resolvers: ((t: T) => void)[];

  constructor() {
    this._resolvers = [];
    this._promises = [];
  }

  private _add() {
    this._promises.push(new Promise(resolve => {
      this._resolvers.push(resolve);
    }));
  }

  enqueue(t: T) {
    if (!this._resolvers.length) this._add();
    const resolve = this._resolvers.shift()!;
    resolve(t);
  }

  dequeue() {
    if (!this._promises.length) this._add();
    const promise = this._promises.shift()!;
    return promise;
  }

  isEmpty() {
    return !this._promises.length;
  }

  isBlocked() {
    return !!this._resolvers.length;
  }

  get length() {
    return this._promises.length - this._resolvers.length;
  }
}
Rico Kahler
  • 17,616
  • 11
  • 59
  • 85
  • "*should never happen*" really is "*can never happen*" - there's a check that adds the element when the array is empty right before it is removed :-) – Bergi Nov 07 '17 at 12:28
  • @Bergi updated to match. Typescript types Array.prototype.shift as `() => T | undefined` so I had to remove the `undefined` from the type. I saw your check though :) – Rico Kahler Nov 07 '17 at 12:31