2

I have a Feathers application that is using RabbitMQ and a custom amqplib wrapper to communicate with some other code running elsewhere and I'm struggling to write a good integration test to show that the callback that runs when a message is received runs correctly. The actual callback just takes the body of the received message and calls an internal service to put the data in the database.

I have a RabbitMQ server running in the test environment, and the idea was to write a test that publishes some dummy data to the correct exchange, and then check that the data ends up in the database. The problem is that I can't work out how to tell that the callback has finished before I check the database.

Right now, I just publish the message and then use a timeout to wait a few seconds before checking the database, but I don't like this since there is no guarantee that the callback will have completed.

The code I'm testing looks something like this (not the actual code just an example):

const app = require('./app');

// handleAMQP is passed as a callback to the consumer
// it creates a new record in the myService database
const handleAMQP = async(message) => {
  await app.service('users').create(message.content);
};

// Subscribe takes an amqp connection, opens a channel, and connects a callback
const subscribe = (conn) => {
  let queue = 'myQueue';
  let exchange = 'myExchange';

  return conn.createChannel().then(function (ch) {
    var ok = ch.assertExchange(exchange, 'topic', { durable: true });

    ok = ok.then(function () {
      return ch.assertQueue(queue, { exclusive: true });
    });

    ok = ok.then(function (qok) {
      var queue = qok.queue;
      ch.bindQueue(queue, exchange, topic);
    });

    ok = ok.then(function (queue) {
      return ch.consume(queue, handleAMQP);
    });

  });

};

module.exports = {subscribe};

And my test looks something like this:

const assert = require('assert');
const amqp = require('amqplib');

describe('AMQP Pub/Sub Tests', async () => {
  let exchange = 'myExchange';
  let topic = 'myTopic';

  let dummyData = {
    email: 'example@example.com',
    name: 'Example User'
  }

  it('creates a new db enry when amqp message recieved', async () => {
    // Publish some dummy data
    await amqp.connect('amqp://localhost').then((conn) => {
      conn.createChannel().then((ch) => {
        ch.assertExchange(exchange, 'topic', {durable: true}).then(() => {
          ch.publish(exchange, topic, dummyData).then(() => {
            ch.close();
          })
        });
      });
    });

    await setTimeout(() => { // Wait three seconds
      let result = app.service('users').find({email : 'example@example.com'}); // Attempt to find the newly created user
      assert.deepEqual(result.email, dummyData.email);
      assert.deepEqual(result.name, dummyData.name);
    }, 3000);
  });
});

Instead of just waiting an arbitrary time limit before I check if the record exists, is there a better way to structure this test?

Or is waiting a certain time a totally valid for event-driven functionality?

Huggzorx
  • 144
  • 1
  • 13

0 Answers0