As others have said, don't use readline. I also gave up on writeStream. This is the entire lambda. AWS javascript SDK 3.
import 'source-map-support/register';
import {s3Client} from "@api/clients/s3Client";
import {GetObjectCommand, GetObjectCommandInput, PutObjectCommand, PutObjectCommandOutput} from "@aws-sdk/client-s3";
import * as fs from "fs";
import path from "path";
import {winnerAppConfig} from "@api/config/app-config";
import mktemp from "mktemp";
import * as os from "os";
import {sdkStreamMixin} from "@aws-sdk/util-stream-node";
const limit = 2000;
const s3Put = async (temporary: string, sourceBucket:
GetObjectCommandInput): Promise<PutObjectCommandOutput> => {
const fileName = path.parse(temporary).name;
console.log(`Filename is ${fileName}`);
console.log(JSON.stringify(path.parse(temporary)));
try {
const stats = fs.statSync(temporary);
console.log(`Stats are ${JSON.stringify(stats)}`)
} catch (err) {
console.error(err);
}
const fileStream = fs.createReadStream(temporary);
let targetS3Name = sourceBucket.Key
if (targetS3Name.includes('.')) {
const fileKeyNoSuffixParts = sourceBucket.Key.split('.')
fileKeyNoSuffixParts.pop()
targetS3Name = fileKeyNoSuffixParts.join('.')
}
const fileKeyNoSuffix = targetS3Name + `-${fileName}.csv`
const putObject: PutObjectCommand = new PutObjectCommand({
Bucket: winnerAppConfig.processingBucket,
Key: fileKeyNoSuffix,
Body: fileStream
});
try {
return await s3Client.send(putObject);
} catch (err) {
console.log('Error');
console.log(`Error putting object ${putObject.input.Key}`, err);
}
}
const createTemporaryFile = () => {
try {
return mktemp.createFileSync('/tmp/XXXX.csv');
} catch (err) {
console.error(`Could not create the temporary file ${err}`)
throw err;
}
}
const fileSplitter = async (sourceBucket: GetObjectCommandInput) => {
const data = await s3Client.send(new GetObjectCommand(sourceBucket));
let count = 0
let header = ""
let remaining = ''
const s3Promises = []
let buffer = "";
const processRows = () => {
const sdkStream = sdkStreamMixin(data.Body);
sdkStream.on('data', (data) => {
const lines = (remaining + data).split(/\r?\n/g);
console.log(`Bytes long ${Buffer.byteLength(remaining + data)} overlap ${Buffer.byteLength(remaining + data) % 4096}`)
remaining = lines.pop()
if ((header === "" || header === os.EOL) && lines.length > 0) {
header = lines[0] + os.EOL
console.log(`Header is ${header}`)
}
// Count is not perfect, but near enough for performance splitting.
count += lines.length
buffer += lines.filter(line => line.length > 0).join(os.EOL)
if (buffer.length > 0) {
buffer += os.EOL
}
processRows()
// Read that s3 needs close rather than finish
// https://stackoverflow.com/questions/37837132/how-to-wait-for-a-stream-to-finish-piping-nodejs
const lastBlockSdkStream = sdkStreamMixin(data.Body);
const lastBlockPromise = new Promise((resolve, reject) => lastBlockSdkStream.on('close', async () => {
console.log(`Close - writing file to the bucket`)
buffer += remaining;
const temporaryFile = createTemporaryFile()
fs.writeFileSync(temporaryFile, buffer)
await s3Put(temporaryFile, sourceBucket)
resolve('ok')
}))
s3Promises.push(lastBlockPromise)
return Promise.allSettled(s3Promises).then(() => {
console.log('S3 done')
})
// data.Body is a readable stream
}
export const splitter = async (event, context): Promise<any => {
return new Promise(() => {
const s3EventRecords = event.Records;
console.log('Context is ' + JSON.stringify(context))
console.log('s3EventRecords ' + JSON.stringify(s3EventRecords))
const triggerBucket = s3EventRecords[0].s3.bucket.name;
const newKey = s3EventRecords[0].s3.object.key;
const keyDecoded = decodeURIComponent(newKey.replace(/\+/g, ' '));
console.log(`Processing ${triggerBucket} : ${keyDecoded}`)
const bucket: GetObjectCommandInput = {
Bucket: triggerBucket,
Key: keyDecoded
};
try {
return fileSplitter(bucket);
} catch (err) {
console.log(`Error ${err}`);
throw new Error(`Error getting object ${newKey} from bucket ${triggerBucket}`)
}
})
}
export const main = splitter;