I have been trying to convert my existing Node JS code from function callback to Async Await, because a new SDK came out and callbacks are deprecated. These are the related codes. The code is communicating and publishing to a mqtt broker. First here I call the ToggleX method
super.controlToggleX(channel, value, (err, res) => {
if (err) {
this.log(`Toggle Response: err: ${err}`);
}
this.log(`Toggle Response: res: ${JSON.stringify(res)}`);
});
After that in the controlToggleX
method I set the payload for the message and make the publlish to the broker.
controlToggleX(channel, onoff, callback) {
const payload = { togglex: { channel, onoff: onoff ? 1 : 0 } };
return this.publishMessage('SET', 'Appliance.Control.ToggleX', payload, callback);
}
In the publishMessage
I compile the message to the broker and publish it. After publishing I waiting for the response and to keep track of the ongoing messages I create the waitingMessageIds
array.
publishMessage(method, namespace, payload, callback) {
this.clientResponseTopic = `/app/${this.userId}-${appId}/subscribe`;
const messageId = crypto.createHash('md5').update(generateRandomString(16)).digest('hex');
const timestamp = Math.round(new Date().getTime() / 1000); // int(round(time.time()))
const signature = crypto.createHash('md5').update(messageId + this.key + timestamp).digest('hex');
const data = {
header: {
from: this.clientResponseTopic,
messageId,
method,
namespace,
payloadVersion: 1,
sign: signature,
timestamp,
},
payload,
};
this.client.publish(`/appliance/${this.uuid}/subscribe`, JSON.stringify(data));
if (callback) {
this.waitingMessageIds[messageId] = {};
this.waitingMessageIds[messageId].callback = callback;
this.waitingMessageIds[messageId].timeout = setTimeout(() => {
// this.log('TIMEOUT');
if (this.waitingMessageIds[messageId].callback) {
this.waitingMessageIds[messageId].callback(new Error('Timeout'));
}
delete this.waitingMessageIds[messageId];
}, 20000);
}
this.emit('rawSendData', data);
return messageId;
}
When a new message comes from the broker I check the waitingMessageIds
array, the messageId is in the array? If yes I delete the Timer and process the message with the callback coming from the publishing.
this.client.on('message', (topic, message) => {
if (!message) return;
// message is Buffer
try {
message = JSON.parse(message.toString());
} catch (err) {
this.emit('error', `JSON parse error: ${err}`);
return;
}
if (message.header.from && !message.header.from.includes(this.uuid)) return;
if (this.waitingMessageIds[message.header.messageId]) {
if (this.waitingMessageIds[message.header.messageId].timeout) {
clearTimeout(this.waitingMessageIds[message.header.messageId].timeout);
}
this.waitingMessageIds[message.header.messageId].callback(null, message.payload || message);
delete this.waitingMessageIds[message.header.messageId];
} else if (message.header.method === 'PUSH') {
const namespace = message.header ? message.header.namespace : '';
this.log('Found message');
this.emit('data', namespace, message.payload || message);
}
this.emit('rawData', message);
});
mqtt package is working with callback, but the async-mqtt is returning Promise so it is going to be good for me.
I was successfull to publish with it, and after that point I put the messageId to the array and start a timer, but when the reply came i was not been able to proocess the waitingMessageIds
and return to the original point (super.controlToggleX)
.
Could somebody please help me. Thank
Edit:
I tried to rewrite PublishMessage
with async-mqtts and it looks like this:
async publishMessage(method, namespace, payload) {
.
.
.
try {
await this.client.publish(`/appliance/${this.uuid}/subscribe`, JSON.stringify(data));
} catch (err) {
return new Error(err);
}
this.waitingMessageIds[messageId] = {};
// this.waitingMessageIds[messageId].callback = callback;
this.waitingMessageIds[messageId].timeout = setTimeout(() => {
// this.log('TIMEOUT');
if (this.waitingMessageIds[messageId].callback) {
this.waitingMessageIds[messageId].callback(new Error('Timeout'));
}
delete this.waitingMessageIds[messageId];
}, 20000);
this.emit('rawSendData', data);
return messageId;
}
Because with the await publish waits for the response I do not need check if it is a callback, i just put the messageId
into the waitingmessageIds
array.
When I process the incoming message in this.client.on('message'
I don not know how to change this -> this.waitingMessageIds[message.header.messageId].callback(null, message.payload || message);