2

I'm developing a AMQPClient class to abstract RPC calls, works perfectly on the first call, but when calling again the correlationId has the value of the first call.

async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, onMessage, { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

Output:

generated correlationId:  lwfvgqym5ya
lwfvgqym5ya lwfvgqym5ya true

generated correlationId:  1m09k9jk2xm
lwfvgqym5ya 1m09k9jk2xm false

The correlationId printed on the second time matches the correlationId from the first call that was already resolved. The second call was made after the first one resolved.

I already tried moving the const correlationId = Math.random().toString(36).slice(2) outside the new Promise(...). I also tried to pass a anon function to the consume callback calling the onMessage funcion, no success.

this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })

I also tried to pass the correlationId as parameter, none above works. Always the second call uses the last value of correlationId inside de onMessage function.

Full code:

import client, { Channel, Connection, ConsumeMessage } from 'amqplib'

class AMQPClient {
  private channel?: Channel

  constructor(private readonly amqpUrl: string) {
    client.connect(this.amqpUrl).then((connection) => {
      connection.createChannel().then((channel) => {
        this.channel = channel
      })

      process.on('SIGINT', () => this.close(connection))
      process.on('SIGTERM', () => this.close(connection))
    })
  }

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

  close(connection: Connection) {
    connection.close()
    process.exit(0)
  }
}

const amqpClient = new AMQPClient(process.env.AMQP_URL || 'amqp://localhost')
export { amqpClient, AMQPClient }

Call:

this.amqpClient.RPC<MerchantStatus>(
  'getMerchantStatus',
  JSON.stringify({ merchantId: 'test' })
)
  • 2
    Promises resolve once and only once and they "latch" the first resolved value. They are one-shot devices. – jfriend00 May 26 '23 at 20:53
  • 2
    @jfriend00 Yes, but I'm calling the function twice, isn't expected to create a new Promise? – Lincon Dias May 26 '23 at 20:57
  • 1
    Are you 100% sure that `.removeListener()` works to undo `.consume()`? Your diagnostics suggest that the original `onMessage()` is still installed and getting called. – jfriend00 May 26 '23 at 21:26
  • @jfriend00 Ok, so you pointed me to the right direction. The problem was that removeListener was no working, so the callback was "freezed" to the first correlationId. The solution was to create a hashmap of callback functions with correlationId as key. I'll post the solution as answer. – Lincon Dias May 26 '23 at 22:03

1 Answers1

0

As pointed by @jfriend00, the onMessage callback function was not being removed from the consume function, so it was keeping the value of the first correlationId.

The solution was to create a HashMap using the correlationId as key and a callback function as the value. So when the queue is consumed, it checks the callback HashMap with the correlationId sent on the message; if it finds a registered callback, the callback its called resolving the promise with the message value.

Working code:

class AMQPClient {
  private callbacks: Record<string, (message: ConsumeMessage) => void> = {}

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)
    const correlationId = Math.random().toString(36).slice(2)

    return new Promise((resolve) => {
      this.callbacks[correlationId] = (message) => {
        resolve(JSON.parse(message.content.toString()))
        delete this.callbacks[correlationId]
      }

      this.channel?.consume(replyTo, (message) => {
        if (message) {
          const correlationId = message.properties.correlationId
          const callback = this.callbacks[correlationId]
          if (callback) {
            callback(message)
          }
        }
      })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }
}

More details about RPC over RabbitMQ.

  • It seems like the architectural mistake here is calling `.consume()` more than once as these will build up forever. You only need one call to `.consume()`. – jfriend00 May 27 '23 at 02:58
  • Or, looking briefly at this [API](https://amqp-node.github.io/amqplib/channel_api.html#channel_consume), you can use `.cancel(consumerTag)` to cancel a `.consume()` operation where `consumerTag` is part of the `.consume()` API. – jfriend00 May 27 '23 at 03:30