1

I am trying to write a lambda function that can convert stream a huge csv file to multiple small json files (say a json file for 2000 rows) from and to a s3 bucket. I though have some restrictions like running in a limited RAM memory of 256 MB.

I am able to do the same by getting the file as file instead of stream like below.

But due to memory constraints i need to handle this in streams. Is there a way to do the same using streams?

// transformationClass.js

const csv = require('csvtojson');

const extension = '.json';
class S3CsvToJson {
    static async perform(input, output, headers) {
        let jsonArray = null;
        const s3Object = await s3.getObject(); // getting the s3 object
        const csvString = s3Object.Body.toString('utf8');
        await csv({
            noheader: false,
        })
            .fromString(csvString)
            .then((csvRow) => {
                jsonArray = csvRow;
            });
        const fileNames = await S3CsvToJson.writeToFile(jsonArray, output);
        return { files: fileNames };
    }

    static async writeToFile(jsonArray, output) {
        const minNumber = 0;
        const maxNumber = 1999;
        const fileNames = [];
        let outFile;
        if (jsonArray && Array.isArray(jsonArray)) {
            let fileIterator = 1;
            while (jsonArray.length) {
                outFile = `${output.key}-${fileIterator}${extension}`;
                await // s3.putObject(). writing to s3
              .putObject(
                    outFile,
                    output.bucketName,
                    JSON.stringify(jsonArray.splice(minNumber, maxNumber)),
                );
                console.log('rows left :', jsonArray.length);
                fileNames.push(outFile);
                fileIterator += 1;
            }
        }
        return fileNames;
    }
}

module.exports = S3CsvToJson;

here is the handler function

// handler.js
module.exports.perform = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result));
    console.log('leaving - ', Date.now());
};

Thanks in advance!!

devil-0-per
  • 173
  • 3
  • 14
  • Can you show where you integrate handler.js ? Is it after a file up/download ? – Silvan Bregy Jul 09 '21 at 08:47
  • Check this out https://github.com/Keyang/node-csvtojson – Amaarockz Jul 09 '21 at 08:58
  • @SilvanBregy handler file is main caller of the transformation class. You can see that I have made the call `await s3CsvToJson.perform` in handler. – devil-0-per Jul 09 '21 at 09:29
  • @devil-0-per Yes, But where is the handler called/registered ? if the `event` passed to the `s3CsvToJson.perform` is already cached the data you may have to setup streaming earlier. whta type of object is the `event` variable which is passed? – Silvan Bregy Jul 09 '21 at 09:44

1 Answers1

0

After going through a lot of stuffs, I finally derived a way to get that done.

What I had to do was, wrap the whole process into a promise and return it. I created a read stream from the s3, forwarded it to the parser, and then to the write stream. I wished to share it here, so it could be useful for others. Also open for any better optimized solutions.

// transformationClass.js

const csv = require('fast-csv');
const { Transform, pipeline } = require('stream');

const extension = '.json';
class S3CsvToJson {
  static async perform(input, output, headers) {
    console.log(input, output, headers);
    const threshold = 2000;
    try {
      const promiseTransformData = () => new Promise((resolve, reject) => {
        try {
          let jsonOutput = [];
          let fileCounter = 0;
          const fileNames = [];
          const writableStream = new Transform({
            objectMode: true,
            autoDestroy: true,
            async transform(data, _, next) {
              if (jsonOutput.length === threshold) {
                fileCounter += 1;
                const fileUpload = new Promise((resolveWriter) => {
                  s3
                    .putObject(
                      `${output.key}-${fileCounter}${extension}`,
                      output.bucketName,
                      JSON.stringify(jsonOutput),
                    )
                    .then(() => resolveWriter());
                });
                await fileUpload;
                fileNames.push(`${output.key}-${fileCounter}${extension}`);
                jsonOutput = [];
              }
              jsonOutput.push(data);
              next();
            },
          });
          const readFileStream = s3.getReadStream(input.key, input.bucketName);
          pipeline(
            readFileStream,
            csv.parse({ headers: true }),
            writableStream,
            (error) => {
              // if (err) throw new Error('Pipeline error');
              if (error) {
                console.error(`Error occurred in pipeline - ${error}`);
                resolve({ errorMessage: error.message });
              }
            },
          );
          writableStream.on('finish', async () => {
            if (jsonOutput.length) {
              fileCounter += 1;
              const fileUpload = new Promise((resolveWriter) => {
                s3
                  .putObject(
                    `${output.key}-${fileCounter}${extension}`,
                    output.bucketName,
                    JSON.stringify(jsonOutput),
                  )
                  .then(() => resolveWriter());
              });
              await fileUpload;
              fileNames.push(`${output.key}-${fileCounter}${extension}`);
              jsonOutput = [];
            }
            console.log({ status: 'Success', files: fileNames });
            resolve({ status: 'Success', files: fileNames });
          });
        } catch (error) {
          console.error(`Error occurred while transformation - ${error}`);
          resolve({ errorMessage: error ? error.message : null });
        }
      });
      return await promiseTransformData();
    } catch (error) {
      return error.message || error;
    }
  }
}

module.exports = S3CsvToJson;

And for the handler i call the S3CsvToJson like this

// handler.js
module.exports.perform = async (event, context, callback) => {
    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result))
        .catch((error) => callback(error));
};

Hope it was helpful. Thanks!

devil-0-per
  • 173
  • 3
  • 14