0

I have been trying to convert my existing Node JS code from function callback to Async Await, because a new SDK came out and callbacks are deprecated. These are the related codes. The code is communicating and publishing to a mqtt broker. First here I call the ToggleX method

super.controlToggleX(channel, value, (err, res) => {
        if (err) {
          this.log(`Toggle Response: err: ${err}`);
        }
        this.log(`Toggle Response: res: ${JSON.stringify(res)}`);
      });

After that in the controlToggleX method I set the payload for the message and make the publlish to the broker.

controlToggleX(channel, onoff, callback) {
      const payload = { togglex: { channel, onoff: onoff ? 1 : 0 } };
      return this.publishMessage('SET', 'Appliance.Control.ToggleX', payload, callback);
    }

In the publishMessage I compile the message to the broker and publish it. After publishing I waiting for the response and to keep track of the ongoing messages I create the waitingMessageIds array.

publishMessage(method, namespace, payload, callback) {
      this.clientResponseTopic = `/app/${this.userId}-${appId}/subscribe`;
      const messageId = crypto.createHash('md5').update(generateRandomString(16)).digest('hex');
      const timestamp = Math.round(new Date().getTime() / 1000); // int(round(time.time()))
 
      const signature = crypto.createHash('md5').update(messageId + this.key + timestamp).digest('hex');
 
      const data = {
        header: {
          from: this.clientResponseTopic,
          messageId, 
          method, 
          namespace, 
          payloadVersion: 1,
          sign: signature, 
          timestamp,
        },
        payload,
      };
      this.client.publish(`/appliance/${this.uuid}/subscribe`, JSON.stringify(data));
      if (callback) {
        this.waitingMessageIds[messageId] = {};
        this.waitingMessageIds[messageId].callback = callback;
        this.waitingMessageIds[messageId].timeout = setTimeout(() => {
          // this.log('TIMEOUT');
          if (this.waitingMessageIds[messageId].callback) {
            this.waitingMessageIds[messageId].callback(new Error('Timeout'));
          }
          delete this.waitingMessageIds[messageId];
        }, 20000);
      }
      this.emit('rawSendData', data);
      return messageId;
    }

When a new message comes from the broker I check the waitingMessageIds array, the messageId is in the array? If yes I delete the Timer and process the message with the callback coming from the publishing.

this.client.on('message', (topic, message) => {
        if (!message) return;
        // message is Buffer
        try {
          message = JSON.parse(message.toString());
        } catch (err) {
          this.emit('error', `JSON parse error: ${err}`);
          return;
        }
        if (message.header.from && !message.header.from.includes(this.uuid)) return;
        if (this.waitingMessageIds[message.header.messageId]) {
          if (this.waitingMessageIds[message.header.messageId].timeout) {
            clearTimeout(this.waitingMessageIds[message.header.messageId].timeout);
          }
          this.waitingMessageIds[message.header.messageId].callback(null, message.payload || message);
          delete this.waitingMessageIds[message.header.messageId];
        } else if (message.header.method === 'PUSH') {
          const namespace = message.header ? message.header.namespace : '';
          this.log('Found message');
          this.emit('data', namespace, message.payload || message);
        }
        this.emit('rawData', message);
      });

mqtt package is working with callback, but the async-mqtt is returning Promise so it is going to be good for me. I was successfull to publish with it, and after that point I put the messageId to the array and start a timer, but when the reply came i was not been able to proocess the waitingMessageIds and return to the original point (super.controlToggleX).

Could somebody please help me. Thank

Edit: I tried to rewrite PublishMessage with async-mqtts and it looks like this:

    async publishMessage(method, namespace, payload) {
.
.
.
      try {
        await this.client.publish(`/appliance/${this.uuid}/subscribe`, JSON.stringify(data));
      } catch (err) {
        return new Error(err);
      }
      this.waitingMessageIds[messageId] = {};
      // this.waitingMessageIds[messageId].callback = callback;
      this.waitingMessageIds[messageId].timeout = setTimeout(() => {
        // this.log('TIMEOUT');
        if (this.waitingMessageIds[messageId].callback) {
          this.waitingMessageIds[messageId].callback(new Error('Timeout'));
        }
        delete this.waitingMessageIds[messageId];
      }, 20000);
      this.emit('rawSendData', data);
      return messageId;
    }

Because with the await publish waits for the response I do not need check if it is a callback, i just put the messageId into the waitingmessageIds array.

When I process the incoming message in this.client.on('message' I don not know how to change this -> this.waitingMessageIds[message.header.messageId].callback(null, message.payload || message);

palatinb
  • 1
  • 2
  • 1
    Just [promisify](https://stackoverflow.com/q/22519784/1048572) your `controlToggleX` method? – Bergi Nov 29 '21 at 19:29
  • Not sure I understand the scenario here. Does `this.client.on('message' ...)` listen for your own message (the one sent with `this.asyncClient.publish()`) or is it a response of very similar composition from some other process? – Roamer-1888 Nov 30 '21 at 07:17
  • Can `this.client.on('message', ...)` be rewritten as `this.client.listen(...)`? If so, then things become much simpler. – Roamer-1888 Dec 01 '21 at 00:16
  • @Bergi I tried your idea but it is not working properly. Some methods are going to timeout. – palatinb Dec 02 '21 at 23:29
  • @Roamer-1888 `this.client.on('message' ...)` listens for messages that I'm subscrided. It listens for mqtt mesasges and based on the mqtt package documentation i cannot rewrite to `this.client.listen(...)`. – palatinb Dec 02 '21 at 23:31
  • I tried to rewrite the `publishMessage` method, but when the message arrived i was not able to return with the callback here: `this.waitingMessageIds[message.header.messageId].callback(null, message.payload || message);` With this line i would return here: `super.controlToggleX(channel, value, (err, res) => { if (err) { this.log('Toggle Response: err: ${err}'); } this.log('Toggle Response: res: ${JSON.stringify(res)}'); });` But because I removed the callback I don't know how to return to there. – palatinb Dec 02 '21 at 23:42
  • @palatinb Please [edit] your question to show your attempt at writing it with promises. What do you mean by "*Some methods are going to timeout.*"? `publishMethod` really should return a promise instead of accepting a `callback`. It should install the `resolve`/`reject` functions as the callback in `waitingMessageIds`. – Bergi Dec 03 '21 at 00:28
  • "*because with the await publish waits for the response*" - if it really does that, you don't need to listen for incoming messages yourself (and keep the `waitingMessageIds` structure) at all? – Bergi Dec 05 '21 at 15:48

0 Answers0