7

Loaded around 3K objects (files) to S3. There is an event trigger an each file that is loaded on to that S3 bucket.

Lambda is receiving event trigger only for around 300 objects. If I retry (move back from S3 and put it back to S3) it generates event for another 400 objects rest of the events did not even reach lambda.

What am I missing here, and how can I scale for any number of objects created?

var async = require('async');                                                                                                                                                                                
var aws = require('aws-sdk');                                                                                                                                                                                
var s3 = new aws.S3();                                                                                                                                                                                       
var kinesis = new aws.Kinesis();                                                                                                                                                                             
var sns = new aws.SNS();                                                                                                                                                                                     
var config = require('./config.js');                                                                                                                                                                         


var logError = function(errormsg) {                                                                                                                                                                          
    sns.publish({                                                                                                                                                                                            
        TopicArn: config.TopicArn,                                                                                                                                                                           
        Message: errormsg                                                                                                                                                                                    
    }, function(err, data) {                                                                                                                                                                                 
        if (err) {                                                                                                                                                                                           
            console.log(errormsg);                                                                                                                                                                           
        }                                                                                                                                                                                                    
    });                                                                                                                                                                                                      
};                                                                                                                                                                                                           


exports.handler = function(event, context, callback) {                                                                                                                                                       

    var readS3andSendtoKinesis = function(record, index, cb) {                                                                                                                                               
        var params = {                                                                                                                                                                                       
            Bucket: record.s3.bucket.name,                                                                                                                                                                   
            Key: record.s3.object.key                                                                                                                                                                        
        }; 
        console.log('Received File: ' +  record.s3.object.key);                                                                                                                                                                                                 
        s3.getObject(params, function(err, data) {                                                                                                                                                           
            if (!err) {                                                                                                                                                                                      
                var kinesisParams = {                                                                                                                                                                        
                    Data: data.Body.toString('utf8'),                                                                                                                                                        
                    PartitionKey: config.PartitionKey,                                                                                                                                                       
                    StreamName: config.StreamName                                                                                                                                                            
                };                                                                                                                                                                                           
                kinesis.putRecord(kinesisParams, function(err, data) {                                                                                                                                       
                    if (err) {                                                                                                                                                                               
                        // Handle Kinesis Failures                                                                                                                                                           
                        logError(JSON.stringify(err, null, 2));                                                                                                                                              
                    }                                                                                                                                                                                        
                    cb(null, 'done');                                                                                                                                                                        
                });                                                                                                                                                                                          
            } else {                                                                                                                                                                                         
                // Handle S3 Failures                                                                                                                                                                        
                logError(JSON.stringify(err, null, 2));                                                                                                                                                      
                cb(null, 'done');                                                                                                                                                                            
            }                                                                                                                                                                                                
        });                                                                                                                                                                                                  
    };                                                                                                                                                                                                       

    async.eachOfLimit(event.Records, 1, readS3andSendtoKinesis, function(err) {                                                                                                                              
        callback(null, 'Done');                                                                                                                                                                              
    });                                                                                                                                                                                                      
}; 

Since everyone recommended to look at cloudwatch, sharing the cloudwatch metrics here for the associated lambda,

enter image description here

halfer
  • 19,824
  • 17
  • 99
  • 186
Kannaiyan
  • 12,554
  • 3
  • 44
  • 83
  • Are you verifying this by looking at Invocation count for your lambda function? – dzm Nov 06 '17 at 22:14
  • Invocation count and as well as we log the key that got sent to the event. Does not match the number of objects created. – Kannaiyan Nov 06 '17 at 22:32
  • Did you trigger on the `s3:ObjectCreated:*` event? – Michael - sqlbot Nov 06 '17 at 23:14
  • @Michael-sqlbot It is on s3:ObjectCreated:* event and I'm creating all the files the same way, nothing different. Did Wiped out all the files and recreated with a different name. – Kannaiyan Nov 07 '17 at 00:54
  • 1
    Rather than using the Invocation Count, can you check CloudWatch Logs to determine whether the events were triggered? – John Rotenstein Nov 07 '17 at 01:48
  • This scenario is unexpected. Does `Records` contain only ever contain one object per event? I've always believed that it did, but the documentation is a little sparse. – Michael - sqlbot Nov 07 '17 at 02:51
  • @JohnRotenstein I do log the S3 key of each event delivered to Lambda. When I create 3K files, I can see only around 300 or 400. Not a fixed number. I could not see the log of rest of 2.7K files. – Kannaiyan Nov 07 '17 at 05:13
  • Added code for reference, removed any business logic. It takes the S3 Object and drops into kinesis stream for further processing. – Kannaiyan Nov 07 '17 at 05:16
  • @Michael-sqlbot It is coded for multiple records in case if it delivers multiple records. – Kannaiyan Nov 07 '17 at 05:17
  • So even if that's happening [(which I doubt)](https://stackoverflow.com/a/28486801/1695906) then it should have worked. Suggestion: make an SNS topic as the S3 notification target, then subscribe both Lambda and an SQS queue to the SNS topic. See if the queue catches more than Lambda does. (My S3 >> Lambda setups always have SNS in between, because it gives me the option of sending events to multiple places non-invasively... which also might explain why I have never seen this). Your Lambda code will need to unwrap the SNS structure from the payload but this could be a worthwhile exercise. – Michael - sqlbot Nov 07 '17 at 12:49
  • @Michael-sqlbot . I understand that is a workaround. When there is a direct delivery why should we should make a roundtrip for a single consumer with sns and one more technology to monitor in the system. I will check with support and see if they can be of help. – Kannaiyan Nov 07 '17 at 14:12
  • Can it be a concurrency issue? Did you check the CloudWatch Throttles metric? – Bachman Nov 07 '17 at 14:13
  • @Bachman Should AWS not handle this issue even if there is concurrency issue? It is totally under their control to deliver the event reliably to Lambda. – Kannaiyan Nov 07 '17 at 14:18
  • Updated Cloudwatch Metrics for additional info. – Kannaiyan Nov 07 '17 at 14:29
  • @Kannaiyan I'm not saying it's necessary, ongoing. I'm suggesting trying another route so that you can document either 1 failing and 1 succeeding case, or 2 failing cases (if that turns out to be what you find) when you contact support. The queue I suggested was also intended as a temporary fallback check. I have done mine with SNS, as I mentioned, for flexibility -- rather than of necessity -- but it would be interesting (and troubling) if one alternative works as expected while the other doesn't. – Michael - sqlbot Nov 07 '17 at 21:31
  • In that cloudwatch metric chart, it looks like the invocation (red) line is flat at 0, are you sure that is the correct lambda/ chart ? It might help to get the different units on left and right axist separate. – Efren Mar 08 '18 at 03:44

2 Answers2

2

We found the root cause seems to be failing on the other side of the resource. S3 Triggers happening and could not scale to the huge triggers it receives.

To solve,

Return the S3 Lambda Trigger as quickly as possible, delaying will cause issues.

If you take too much time to work on the business logic inside trigger, in our case we were reading from S3 and writing to the stream. Instead we just wrote the location of the S3 and read from S3 on the receiving end.

Hope it helps.

Kannaiyan
  • 12,554
  • 3
  • 44
  • 83
0

AWS Lambda has a throttling configuration that avoids out of control situations.

For S3, the Lambda invocation is also dependent on permissions, so you should check those permissions.

Since S3 is not a stream-based source, it is possible that you are seeing the Synchronous scenario, where throttling hits the limit and S3 is not retrying. Check throttling and Error 429 in lambdas.

Efren
  • 4,003
  • 4
  • 33
  • 75