I'm developing a AMQPClient class to abstract RPC calls, works perfectly on the first call, but when calling again the correlationId
has the value of the first call.
async RPC<T>(queue: string, message: string): Promise<T> {
if (!this.channel) {
throw new Error('Channel not initialized')
}
const replyTo = `${queue}.reply`
await this.channel.assertQueue(replyTo)
await this.channel.assertQueue(queue)
return new Promise<T>((resolve) => {
const correlationId = Math.random().toString(36).slice(2)
console.log('generated correlationId: ', correlationId)
const onMessage = (message: ConsumeMessage | null) => {
console.log(
correlationId,
message?.properties.correlationId,
correlationId === message?.properties.correlationId
)
if (message && message.properties.correlationId === correlationId) {
resolve(JSON.parse(message.content.toString()))
this.channel?.removeListener('message', onMessage)
}
}
this.channel?.consume(replyTo, onMessage, { noAck: true })
this.channel?.sendToQueue(queue, Buffer.from(message), {
correlationId,
replyTo
})
})
}
Output:
generated correlationId: lwfvgqym5ya
lwfvgqym5ya lwfvgqym5ya true
generated correlationId: 1m09k9jk2xm
lwfvgqym5ya 1m09k9jk2xm false
The correlationId printed on the second time matches the correlationId from the first call that was already resolved. The second call was made after the first one resolved.
I already tried moving the const correlationId = Math.random().toString(36).slice(2)
outside the new Promise(...)
.
I also tried to pass a anon function to the consume callback calling the onMessage funcion, no success.
this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
I also tried to pass the correlationId as parameter, none above works. Always the second call uses the last value of correlationId inside de onMessage function.
Full code:
import client, { Channel, Connection, ConsumeMessage } from 'amqplib'
class AMQPClient {
private channel?: Channel
constructor(private readonly amqpUrl: string) {
client.connect(this.amqpUrl).then((connection) => {
connection.createChannel().then((channel) => {
this.channel = channel
})
process.on('SIGINT', () => this.close(connection))
process.on('SIGTERM', () => this.close(connection))
})
}
async RPC<T>(queue: string, message: string): Promise<T> {
if (!this.channel) {
throw new Error('Channel not initialized')
}
const replyTo = `${queue}.reply`
await this.channel.assertQueue(replyTo)
await this.channel.assertQueue(queue)
return new Promise<T>((resolve) => {
const correlationId = Math.random().toString(36).slice(2)
console.log('generated correlationId: ', correlationId)
const onMessage = (message: ConsumeMessage | null) => {
console.log(
correlationId,
message?.properties.correlationId,
correlationId === message?.properties.correlationId
)
if (message && message.properties.correlationId === correlationId) {
resolve(JSON.parse(message.content.toString()))
this.channel?.removeListener('message', onMessage)
}
}
this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
this.channel?.sendToQueue(queue, Buffer.from(message), {
correlationId,
replyTo
})
})
}
close(connection: Connection) {
connection.close()
process.exit(0)
}
}
const amqpClient = new AMQPClient(process.env.AMQP_URL || 'amqp://localhost')
export { amqpClient, AMQPClient }
Call:
this.amqpClient.RPC<MerchantStatus>(
'getMerchantStatus',
JSON.stringify({ merchantId: 'test' })
)