I have an application to handle bounce messages received from AWS SQS through SES through SNS.
The logic is as follow
- Initialize Ms SQL Server connection pool
- Poll data from SQS with long polling enabled and Wait Time is set to 20 seconds and max number of messages received to set to 10
- If there are messages meaning the callback from "sqs.receiveMessage" method has been called
- using "async.each" to process message(s) in parallel
- for each message :
- log it to Sql Table
- disable Account if hard bounce
- delete from queue
- for each message :
- Repeat step 2
Because SQS doesn't work exactly like a pub/sub model like how Kafka Consumer/Producer works. I have to keep polling for data. Initially, I tried using while(true) {} loop but it doesn't work. I am thinking maybe the reason is because the main call stack always has something to do and it is never empty?
Index.ts
async.series([
function start(step) {
LOG.info('Initializing Ms SQL Server Connection Pool');
sql = MssqlDao.getInstance();
sql.init().then(() => {
if (sql.isInitialised) {
LOG.info('SQL Server Initialized');
}
step();
});
}, function listen() {
const bounces = new Bounces(sql.connectionPool);
**// THIS WON'T WORK. Why?**
while(true) {
LOG.info('Initializing Bounces listener continuously');
bounces.listen();
}
**// It works if I use CronTab or remove while loop and use recursion**
}
]);
Bounces.ts
listen() {
const queueURL: string | undefined = process.env.QUEUE_URL;
if (!queueURL) {
throw new Error(`Queue url is required`);
}
const params: any = {
AttributeNames: [
"SentTimestamp"
],
MaxNumberOfMessages: 10,
MessageAttributeNames: [
"All"
],
QueueUrl: queueURL,
//VisibilityTimeout: 20,
WaitTimeSeconds: 20
};
let that = this as any;
sqs.receiveMessage(params, function (err, data: ReceiveMessageResult) {
// THIS CALLBACK NEVER GETS CALLED INSIDE WHILE LOOP
if (err) {
LOG.error("Receive Error", err);
} else if (data.Messages) {
let messages = data.Messages;
async.each(messages, async (_message: any) => {
let message: any = JSON.parse(_message.Body);
// Insert
let parameters: Parameter = {...};
const parameterKeys: string[] = Object.keys(parameters);
try {
let request = that.sql.request() as Request;
await that.insert(parameterKeys, request, parameters);
let eventStatusCode = parameters.eventStatusCode;
let email = parameters.eventRecipient;
await that.disableAccount(eventStatusCode, email, that.sql);
// Delete message after logging
let deleteParams: DeleteMessageRequest = {
QueueUrl: queueURL as string,
ReceiptHandle: _message.ReceiptHandle as string
};
that.delete(deleteParams);
} catch (e) {
LOG.error(e);
}
});
}
**// IT WORKS IF listen() is called recursively here
// Will this cause memory issues?
// Will it eventually fill the call stack and stack overflow?
// that.listen();**
});
}
Thanks