2

Greeting guys.
Can you help me with asynchronnous in node.js?

Problem this:

I'm use amqplib module for work with RabbitMQ and here there method consume, who gives messages from RabbitMQ, but that method first return promise about he starts and after this promise starts, he call callbacks to get data from RabbitMQ, and i dont know how to catch when all messages will be send to my node js app.

For more explain, here my code and at end code at comments i wrote what i want:

/**
 * Here my test code
 *
 * requirng amqp.node lib
 */
let amqp = require('amqplib')
  , configConnection = { /* ..config options */ }
  , queue = 'users'
  , exchange = 'users.exchange'
  , type = 'fanout'

/**
 * declare annonymous function as async and immediately called it
 */
(async () => {
  /**
   * declare connection and channel with using async/await construction
   * who support version node.js >= 8.5.0
   */
  let conn = await amqp.connect(configConnection)
  let channel = await conn.createChannel()
  await channel.assertExchange(exchange, type)
  let response = await channel.assertQueue(queue)
  /**
   * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
   */
  response = await channel.bindQueue(response.queue, exchange, '')
  response = await channel.consume(response.queue, logMessage, {noAck: false})
  /**
   * {noAck: false} false for not expect an acknowledgement
   */
  console.log('reading for query finish')

  function logMessage(msg) {
    console.log("[*] recieved: '%s'", msg.content.toString())
  }
})()
  /**
   * output will show:
   * reading for query finish
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * ...
   *
   * But i'm need show message 'reading for query finish' after when
   * all consumes will executed
   *
   * Ask: How i can do this?
   */
Rage Cacao
  • 43
  • 1
  • 6

2 Answers2

2

I found answer on my question here.

Answer in use: EventEmitter && Promise

magic (for me) is here:
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))

So ended code is:

/**
 * Here my test code
 *
 * requirng amqp.node lib
 */
let amqp = require('amqplib')
  , EventEmitter = require('events')
  , eventEmitter = new EventEmitter()
  , timeout = 10000
  , configConnection = { /* ..config options */ }
  , queue = 'users'
  , exchange = 'users.exchange'
  , type = 'fanout'

/**
 * declare annonymous function as async and immediately called it
 */
(async () => {
  /**
   * declare connection and channel with using async/await construction
   * who support version node.js >= 8.5.0
   */
  let conn = await amqp.connect(configConnection)
  let channel = await conn.createChannel()
  await channel.assertExchange(exchange, type)
  let response = await channel.assertQueue(queue)
  /**
   * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
   */
  let messageCount = response.messageCount
  response = await channel.bindQueue(response.queue, exchange, '')
  response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false})
  /**
   * {noAck: false} false for not expect an acknowledgement
   */

  /**
   * declare timeout if we have problems with emit event in consume
   * we waiting when event will be emit once 'consumeDone' and promise gain resolve
   * so we can go to the next step
   */
  setTimeout(() => eventEmitter.emit('consumeDone'), timeout)
  await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
  console.log('reading for query finish')

  function logMessage(messageCount) {
    return msg => {
      console.log("[*] recieved: '%s'", msg.content.toString())
      if (messageCount == msg.fields.deliveryTag) {
        eventEmitter.emit('consumeDone')
      }
    }

  }
})()
Rage Cacao
  • 43
  • 1
  • 6
0

Try the below code:

let amqp = require('amqplib/callback_api');

function init() {

    let configConnection = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'root',
        password: '1111',
        heartbeat: 60,
    };
    let messageCount = 0;
    let messageCounter = 0;
    let queue_name = 'queue_name';
    let messages = [];


    amqp.connect(configConnection, function (error, connect) {

        if (error) {
            throw error;
        }

        // Create channel and get info about queue
        connect.createChannel(function (error1, channel) {

            if (error1) {
                throw error1;
            }

            channel.assertQueue(queue_name, {durable: true}, (error2, result) => {
                // here you get count of messages
                messageCount = result.messageCount;

                // Consume to queue
                channel.consume(queue_name, function (msg) {
                    msg = msg.content.toString();
                    messages.push(msg);

                    // Show all messages and exit
                    if (messageCount === ++messageCounter) {
                        console.log(messages);
                        process.exit();
                    }

                }, {
                    noAck: true
                });
            });
        });
    });
}

init();