1

I have the following code that works. I started promisifying it using Bluebird, however, I am not sure how I can promisify the processing of the array of messages.

var s3 = new AWS.S3();
var sqs = new AWS.SQS();
 // This notification call is triggered by the latest message but there may
 // be earlier unprocessed messages. So, we request the maximum number of
 // messages (10) from the queue and process and then remove from the queue
 // all of them.
sqs.receiveMessage({
  QueueUrl: settings.sqsQueueUrl[prdOrDev],
  /* required */
  WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages
  VisibilityTimeout: 120, // without this longpolling didn't work.
  MaxNumberOfMessages: 10
}, function(err, data) {
  if (err) {
    console.error('SQS receiveMessage failed: ', err, err.stack);
    return res.status(400).json({
      success: false
    });
  } else {
    var messages = data.Messages;
    messages.forEach(function(message) {
      var body = JSON.parse(message.Body);
      var sesMsg = JSON.parse(body.Message);
      s3.getObject({
        Bucket: sesMsg.receipt.action.bucketName,
        Key: sesMsg.receipt.action.objectKey
      }, function(err, data2) {
        if (err) {
          console.error('S3 getObject failed: ', err, err.stack);
        } else {
          sqs.deleteMessage({
            QueueUrl: settings.sqsQueueUrl[prdOrDev],
            /* required */
            ReceiptHandle: message.ReceiptHandle
          }, function(err, data) {
            if (err) {
              console.error('SQS deleteMessage failed: ', err, err.stack);
            }
          });
        }
      });
    });
  }
});

Here is my attempt at promisifying the code above:

var Promise = require('bluebird');
var s3 = new AWS.S3();
var sqs = new AWS.SQS();
Promise.promisifyAll(Object.getPrototypeOf(s3));
Promise.promisifyAll(Object.getPrototypeOf(sqs));

sqs.receiveMessageAsync({
  QueueUrl: settings.sqsQueueUrl[prdOrDev],
  /* required */
  WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages
  VisibilityTimeout: 120, // without this longpolling didn't work.
  MaxNumberOfMessages: 10
}).then(function(data) {
  var messages = data.Messages;
  messages.forEach(function(message) {
    var body = JSON.parse(message.Body);
    var sesMsg = JSON.parse(body.Message);
    s3.getObjectAsync({
      Bucket: sesMsg.receipt.action.bucketName,
      Key: sesMsg.receipt.action.objectKey
    }).then(function(data2) {
      return sqs.deleteMessageAsync({
        QueueUrl: settings.sqsQueueUrl[prdOrDev],
        /* required */
        ReceiptHandle: message.ReceiptHandle
      }).catch(function(err) {
        console.log('SQS deleteMessage failed: ', err, err.stack);
      });
    }).catch(function(err) {
      console.log('S3 getObject failed: ', err, err.stack);
    });
  });
}).catch(function(err) {
  notifyAdmin('SQS receiveMessage failed: ', err, err.stack);
});

I am guessing that this is not the best way to use Promises. I am especially curious if there is a better way to deal with the forEach loop, similar to the following example from Bluebird's main page:

mongoClient.connectAsync('mongodb://localhost:27017/mydb')
    .then(function(db) {
        return db.collection('content').findAsync({})
    })
    .then(function(cursor) {
        return cursor.toArrayAsync();
    })
    .then(function(content) {
        res.status(200).json(content);
    })
    .catch(function(err) {
        throw err;
    });

So, how do I best promisify the code snippet at the top using Bluebird?

woz
  • 197
  • 1
  • 3
  • 10

1 Answers1

2

In the forEach loop, your then() function is breaking the chain, you're creating promises but you don't "wait" for them. The usual way is to store all promises in an array and use Promise.all(). So with your code:

sqs.receiveMessageAsync({
  QueueUrl: settings.sqsQueueUrl[prdOrDev],
  /* required */
  WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages
  VisibilityTimeout: 120, // without this longpolling didn't work.
  MaxNumberOfMessages: 10
}).then(function(data) {
  var messages = data.Messages;
  var promises = [];
  messages.forEach(function(message) {
    var body = JSON.parse(message.Body);
    var sesMsg = JSON.parse(body.Message);

    var promise = s3.getObjectAsync({
      Bucket: sesMsg.receipt.action.bucketName,
      Key: sesMsg.receipt.action.objectKey
    }).then(function(data2) {
      return sqs.deleteMessageAsync({
        QueueUrl: settings.sqsQueueUrl[prdOrDev],
        /* required */
        ReceiptHandle: message.ReceiptHandle
      }).catch(function(err) {
        console.log('SQS deleteMessage failed: ', err, err.stack);
      });
    }).catch(function(err) {
      console.log('S3 getObject failed: ', err, err.stack);
    });

    promises.push(promise);
  });

  return Promise.all(promises);
}).then(function(result) {
  console.log('all done');
}).catch(function(err) {
  notifyAdmin('SQS receiveMessage failed: ', err, err.stack);
});

You can also simplify the promisify code to:

var s3 = Promise.promisifyAll(new AWS.S3());
var sqs = Promise.promisifyAll(new AWS.SQS());
Shanoor
  • 13,344
  • 2
  • 29
  • 40
  • I will test this shortly, thank you. I took the idea for the `Promise.promisifyAll(Object.getPrototypeOf(s3));` piece from [this thread](http://stackoverflow.com/a/26475487/2234029). I haven't checked it out yet, but it seems to be working for others. – woz Jan 07 '16 at 01:14
  • @woz, in the same thread, someone else says it didn't work for him: http://stackoverflow.com/a/28973401/5388620 – Shanoor Jan 07 '16 at 06:02
  • I just tested `Promise.promisifyAll(Object.getPrototypeOf(s3));` and `Promise.promisifyAll(Object.getPrototypeOf(sqs));`. I can confirm that these calls work fine and their return values do not need to be assigned to vars s3 and sqs. Also, the callback `function(err, data2)` doesn't work the same in a 'then'. It needs to be `function(data2)`. Continuing my investigation... – woz Jan 10 '16 at 07:11
  • About the `err`, yes, it gets throwed for the `.catch()`, I copy-pasted too quickly. About `promisifyAll()`, if it works that way, it's good, I think the main issue was the messages promises not returned. – Shanoor Jan 10 '16 at 07:16
  • Yes, if you can make a second edit and remove the top part about promisify code having an issue, I can set your solution as the answer. – woz Jan 10 '16 at 07:57