2

I am creating a Lambda Function which gets data from s3 bucket and stream it to fast-csv for parsing. After that, I need to connect to documentDB database to send those parsed data.

But the problem is that sometimes the database connection function runs before the parse function and throws blank array and parsed function dont run sometime or vice-versa.

So, how can I run the parse function (parserFcn function) always before the database connection and send function (connectToDb function) so that it can get data from the parse function.

Here is the code -

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  var parsedData = [];
  const s3Contents = s3.getObject(params).createReadStream();

  let parserFcn = new Promise((resolve, reject) => {
    const parser = csv
      .parseStream(s3Contents, { headers: true })
      .on("data", function (data) {
        parsedData.push(data);
      })
      .on("end", (rowCount) => {
        console.log(`Parsed ${rowCount} rows`);
        resolve(parsedData);
      })
      .on("error", function () {
        reject("csv parse process failed");
      });
    return parser;
  });

  let connectToDb = new Promise((resolve, reject) => {
    var client = MongoClient.connect(
      "mongodb://user:pass@host/?ssl=true&retryWrites=false",
      {
        tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
      },
      function (err, client) {
        if (err) {
          throw err;
        } else {
          console.log("connected ");
        }
        console.log("parsedData inside conn ", parsedData);

        // Specify the database to be used
        db = client.db("database-name");

        // Specify the collection to be used
        col = db.collection("collection-name");

        // Insert Multiple document
        col.insertMany(parsedData, function (err, result) {
          if (err) {
            console.log("error->", err);
          }
          console.log("Result from db->", result);

          //Close the connection
          client.close();
        });
      }
    );
    return client;
  });

  const parserdata = await parserFcn;
  const conn = await connectToDb;

  let promiseFactories = [parserdata, conn];

  Promise.all(promiseFactories).then((data) => {
    console.log("completed all promises", data);
  });
};
traktor
  • 17,588
  • 4
  • 32
  • 53
Fury
  • 151
  • 1
  • 9
  • Make `parserFcn` and `connectToDb` functions that return a promise. Then call them with `await parserFcn()` and `await connectToDb()`. – Gamma032 Jul 06 '22 at 06:11
  • @Gamma032 But i did the await part here - `const parserdata = await parserFcn; const conn = await connectToDb; let promiseFactories = [parserdata, conn]; Promise.all(promiseFactories).then((data) => { console.log("completed all promises", data); });` should i remove the promise.all () ? – Fury Jul 06 '22 at 06:12
  • Are `db` and `col` supposed to be global variables or should they have been declared somewhere? Also `new Promise` calls discard any values returned from promise executor functions, so `return parser` and `return client` have no effect besides returning from the executor. – traktor Jul 06 '22 at 06:58
  • @traktor db and col not declared anywhere so it is global – Fury Jul 06 '22 at 07:18

2 Answers2

2

Here's an attempt at replacing promise definitions in the post with functions that return a promise as suggested by @Gamma032. You may find it useful as a guide to compare with code you are writing and what the handler is supposed to do.

The replacement functions were not declared as async functions because they're using callbacks to resolve/reject the new promises they create and return. Waiting for functions to complete in the order called is performed inside a standard try/catch so that code can detect await re-throwing the rejection reason of a rejected promise it was waiting on.

I left the global variables mentioned in comment as they were, but moved the initial definition of parsedData inside the parseData function and renamed the "connection" function to updateDB because it both connects to and updates the database.

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  const s3Contents = s3.getObject(params).createReadStream();
  
  function parseData() {
    return new Promise((resolve, reject) => {
      const parsedData = [];
      csv.parseStream(s3Contents, { headers: true })
        .on("data", function (data) {
          parsedData.push(data);
        })
        .on("end", (rowCount) => {
          console.log(`Parsed ${rowCount} rows`);
          resolve(parsedData);
        })
        .on("error", function () {
          reject("csv parse process failed");
        });
    });
  }

  function updateDB(parsedData) {
    console.log("parsedData inside updateDB ", parsedData);
    return new Promise((resolve, reject) => {
      var client = MongoClient.connect(
        "mongodb://user:pass@host/?ssl=true&retryWrites=false",
        {
          tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
        },
        function (err, client) {
          if (err) {
            console.error( "connection failure");
            reject(err);
            return; // error return
          }
          console.log("connected ");
          // Specify the database to be used
          db = client.db("database-name");

          // Specify the collection to be used
          col = db.collection("collection-name");

          // Insert Multiple document
          col.insertMany(parsedData, function (err, result) {
            if (err) {
              console.error( "insertion failure");
              reject( err);
            } else {
              resolve( result);
           // Close the connection
           client.close();
            } 
          });

          
        });
      }
    );
  }
  
  // call parseData and updateDB in order
  try {
    const parsedData = await parseData();
    const result = await updateDB(parsedData);
    console.log( "DB updated with result", result);

    // see note:
    // const promiseFactories = [parsedData, result];
    //Promise.all(promiseFactories).then((data) => {

     console.log("completed all promises", data);

    // });
  }
  catch(err) {
     console.error( err); // there may already be a line on the console about the error.
  }
}

  
};

Note

An edit from the OP added

    const promiseFactories = [parsedData, result];
    Promise.all(promiseFactories).then((data) => {
     console.log("completed all promises", data);
    });

to the try clause after awaiting the values of parsedData and result. However neither of these values in a promise (you can't fulfill a promise with a promise and the await operator never returns a promise as the result of the await operation), so passing them through a call to Promise.all simply puts a job in the promise job queue to perform the console.log from the then handler. Logging the message after awaiting both values should suffice.

traktor
  • 17,588
  • 4
  • 32
  • 53
  • I have tried this way too, and lambda does not run after the below console.logs() - `console.log("Bucket Name->", JSON.stringify(bucketName)); console.log("Bucket key->", JSON.stringify(keyName)); ` – Fury Jul 09 '22 at 04:41
1

You should await functions that return promises, not variables that hold promises.

Declaring let parserFcn = new Promise(...) and let connectToDb = new Promise(...) starts the parsing and database connection, with no guarantees on execution order.

So declare two functions:

  1. parserFcn, which returns a promise to the parsed data array.
  2. connectToDb, which takes the parsed data and pushes it to the database.

Then just call them in order:

const parsedData = await parserFn()
await connectToDb(parsedData)
Gamma032
  • 441
  • 4
  • 7
  • I have tried wrapping those into 2 functions like you said but it doesn't run the second function now only the parsed function is running – Fury Jul 06 '22 at 07:12
  • sometimes it doesn't even run anything – Fury Jul 06 '22 at 07:23
  • 1
    I would encourage you to reduce the problem until you find some code that works. You might even like to look up some async and promise examples. Then add back in code as you go, and see where things are breaking. You're working with quite a bit of complexity and this isn't code I can easily run and debug for you. – Gamma032 Jul 06 '22 at 07:26