2

I wonder, how would you implement a barrier with node.js' async module?

I guess that I should use a function that returns to the caller a list of callbacks generated by async. Upon all of them being called, a finish callback would be run.

But which of async's functions may be of aid?

fstab
  • 4,801
  • 8
  • 34
  • 66

3 Answers3

3

Not sure if I understand your question correctly.But you could use async.series.It will run the series of functions specified (one after the other) and then finally call the final callback that you specify once all the functions are executed

You could use an iterator callback(cb) that can be called inside the task callback of each function. you can specify the list of functions in an array,they would be executed in order

async.series([func1,func2],finalcallback);

async.seres([

    function(cb) {
        db.find('likes', function(err, likes) {
            dosomething with likes;
            cb();
        });
    },
    function(cb) {
        db.load('posts_likes', function(err, likes_posts) {
            dosomething with likes_posts;
            cb();
        });
    }
], function(err) {
    This is the final callback that gets called after all the task function callbacks are executed
});
harryy000
  • 553
  • 3
  • 9
  • The below should of good use to understand async 's use cases better. http://www.sebastianseilund.com/nodejs-async-in-practice – harryy000 Nov 07 '14 at 15:30
2

I'm not familiar with Node.js, but when I was writing asynchronous code, in which the threads were supposed to wait for each other to reach a common barrier point, I used the following class:

class CyclicBarrier {
  waitingParties = [];
  limit;
  
  constructor(limit) {
    this.limit = limit;
  }
  
  release() {
    while (this.waitingParties.length) {
      this.waitingParties
          .pop()
          .call();
    }
  }
  
  register() {
    return new Promise((resolve, reject) => {
      this.waitingParties.push(resolve);
      
      if (this.waitingParties.length >= this.limit) {
        this.release();
      }
    });
  }
}

The limit parameter defines how often register() has to be called, before all pending Promises are resolved at once.

const barrier = new CyclicBarrier(4);

Later code then simply uses await:

async function f(barrier) {
  ...
  
  // Pauses execution until enough threads are registered.
  await barrier.register();
  
  ...
}

The class was influenced by the CyclicBarrier Java class.

AFoeee
  • 731
  • 7
  • 24
1

We've implemented a Barrier with JS promises that works with async await. See: https://github.com/MatrixAI/js-async-locks/commit/c843d2b343ba0d7fd43484d93b941dce43b7b9ed

It's built on top of a mutex lock.

class Barrier {
  protected lock: Lock;
  protected count: number;
  protected release: ResourceRelease;

  public static async createBarrier(count: number) {
    const lock = new Lock();
    const [release] = await lock.lock()();
    return new this(count, lock, release);
  }

  protected constructor(count: number, lock: Lock, release: ResourceRelease) {
    if (count < 0) {
      throw new ErrorAsyncLocksBarrierCount();
    }
    this.lock = lock;
    this.release = release;
    this.count = count;
  }

  public async wait(timeout?: number): Promise<void> {
    if (!this.lock.isLocked()) {
      return;
    }
    this.count = Math.max(this.count - 1, 0);
    if (this.count === 0) {
      await this.release();
    } else {
      await this.lock.waitForUnlock(timeout);
    }
  }
}

Assuming you have a lock implementation, construct it already pre-locked.

Then have a counter indicating how many asynchronous operations need to be completed before the lock releases.

This depends on the waitForUnlock code function, which adds a promise that is only resolved once the lock is released. This is done here: https://github.com/DirtyHairy/async-mutex/blob/67efcfe6da259e9faabd6126a748bfb02846657c/src/Semaphore.ts#L47-L55

CMCDragonkai
  • 6,222
  • 12
  • 56
  • 98