0

A simple file splitter. The first line written in the for loop is written to the writestream. Lines written on subsequent iterations of 'for await readline' are not. How do you write the subsequent lines?

const tempWriteStream =  (temporary: string): WriteStream => {
  console.log(`Temporary file is ${temporary}`);
  return fs.createWriteStream(temporary);
}

const fileSplitter = async (sourceBucket: GetObjectCommandInput) => {
    const data = await s3Client.send(new GetObjectCommand(sourceBucket));
    const rl = readline.createInterface({
    input: data.Body,
    crlfDelay: Infinity
  });

  let count = 0
  let header = ""
  let temporary = createTemporaryFile();
  let writeStream = await tempWriteStream(temporary)

  for await (const line of rl) {
    if (count === 0) {
        header = line;
    }
    count++;
    // Handle drain
    const writeCanContinue = writeStream.write(line)
    if (count > limit || !writeCanContinue) {
        console.log('Starting a new file')
        writeStream.end()
        handleCreateNewFile()
    }
  }

  await writeStream.end(async () => {
  })
  await writeStream.on('finish', async () =>{
    await s3Put(temporary, sourceBucket)
  })
// data.Body is a readable stream
}
Interlated
  • 5,108
  • 6
  • 48
  • 79

1 Answers1

0

As others have said, don't use readline. I also gave up on writeStream. This is the entire lambda. AWS javascript SDK 3.

import 'source-map-support/register';
import {s3Client} from "@api/clients/s3Client";
import {GetObjectCommand, GetObjectCommandInput, PutObjectCommand, PutObjectCommandOutput} from "@aws-sdk/client-s3";
import * as fs from "fs";
import path from "path";
import {winnerAppConfig} from "@api/config/app-config";
import mktemp from "mktemp";
import * as os from "os";
import {sdkStreamMixin} from "@aws-sdk/util-stream-node";

const limit = 2000;


const s3Put = async (temporary: string, sourceBucket: 
  GetObjectCommandInput): Promise<PutObjectCommandOutput> => {
   const fileName = path.parse(temporary).name;
   console.log(`Filename is ${fileName}`);
   console.log(JSON.stringify(path.parse(temporary)));
   try {
        const stats = fs.statSync(temporary);
        console.log(`Stats are ${JSON.stringify(stats)}`)
    } catch (err) {
        console.error(err);
    }

    const fileStream = fs.createReadStream(temporary);
    let targetS3Name = sourceBucket.Key
    if (targetS3Name.includes('.')) {
        const fileKeyNoSuffixParts = sourceBucket.Key.split('.')
        fileKeyNoSuffixParts.pop()
        targetS3Name = fileKeyNoSuffixParts.join('.')
    }

    const fileKeyNoSuffix = targetS3Name + `-${fileName}.csv`

    const putObject: PutObjectCommand = new PutObjectCommand({
        Bucket: winnerAppConfig.processingBucket,
        Key: fileKeyNoSuffix,
        Body: fileStream
    });

   try {
      return await s3Client.send(putObject);
   } catch (err) {
    console.log('Error');
    console.log(`Error putting object ${putObject.input.Key}`, err);
   }
}
const createTemporaryFile = () => {
    try {
        return mktemp.createFileSync('/tmp/XXXX.csv');
    } catch (err) {
        console.error(`Could not create the temporary file ${err}`)
        throw err;
    }
}
const fileSplitter = async (sourceBucket: GetObjectCommandInput) => {
const data = await s3Client.send(new GetObjectCommand(sourceBucket));

let count = 0
let header = ""
let remaining = ''
const s3Promises = []
let buffer = "";
 const processRows = () => {
    const sdkStream = sdkStreamMixin(data.Body);

    sdkStream.on('data', (data) => {
        const lines = (remaining + data).split(/\r?\n/g);
        console.log(`Bytes long ${Buffer.byteLength(remaining + data)} overlap ${Buffer.byteLength(remaining + data) % 4096}`)
        remaining = lines.pop()
        if ((header === "" || header === os.EOL) && lines.length > 0) {
            header = lines[0] + os.EOL
            console.log(`Header is ${header}`)
        }
        // Count is not perfect, but near enough for performance splitting.
        count += lines.length
        buffer += lines.filter(line => line.length > 0).join(os.EOL)
        if (buffer.length > 0) {
            buffer += os.EOL
        }

processRows()

// Read that s3 needs close rather than finish
// https://stackoverflow.com/questions/37837132/how-to-wait-for-a-stream-to-finish-piping-nodejs
const lastBlockSdkStream = sdkStreamMixin(data.Body);
const lastBlockPromise = new Promise((resolve, reject) => lastBlockSdkStream.on('close', async () => {
    console.log(`Close - writing file to the bucket`)
    buffer += remaining;
    const temporaryFile = createTemporaryFile()
    fs.writeFileSync(temporaryFile, buffer)
    await s3Put(temporaryFile, sourceBucket)
    resolve('ok')
}))
s3Promises.push(lastBlockPromise)

return Promise.allSettled(s3Promises).then(() => {
    console.log('S3 done')
})

// data.Body is a readable stream
}

export const splitter = async (event, context): Promise<any => {
return new Promise(() => {
    const s3EventRecords = event.Records;
    console.log('Context is ' + JSON.stringify(context))
    console.log('s3EventRecords ' + JSON.stringify(s3EventRecords))

    const triggerBucket = s3EventRecords[0].s3.bucket.name;
    const newKey = s3EventRecords[0].s3.object.key;
    const keyDecoded = decodeURIComponent(newKey.replace(/\+/g, ' '));

    console.log(`Processing ${triggerBucket} : ${keyDecoded}`)
    const bucket: GetObjectCommandInput = {
        Bucket: triggerBucket,
        Key: keyDecoded
    };
    try {
        return fileSplitter(bucket);
    } catch (err) {
        console.log(`Error ${err}`);
        throw new Error(`Error getting object ${newKey} from bucket ${triggerBucket}`)
    }
})
}
export const main = splitter;
Interlated
  • 5,108
  • 6
  • 48
  • 79