I'm using the below pipeline to stream data from Aurora, transform it to csv, and send it to S3.
Readable knex stream:
const getQueryStream = (organizationId) =>
db.select('*')
.from('users')
.where('organization_id', organizationId)
.stream();
Transforming the data:
const toCSVTransform = (fields) => new stream.Transform({
objectMode: true,
transform: (row, encoding, callback) => {
let rowAsArr = [];
for(let i = 0; i < fields.length; i++) {
rowAsArr.push(row[fields[i]]);
}
callback(null, `${rowAsArr.join(',')}\n`);
}
});
Pipeline:
stream.pipeline(
dbStream,
toCSVTransform(['first_name', 'last_name', 'email']),
s3WritableStream,
(err) => {
if (err) {
console.error('Pipeline failed.', err)
} else {
console.log('Pipeline succeeded.')
}
}
)
This works as it is, but we've been given an additional requirement to encrypt the file using PGP encryption. My thought was to have an additional step in the pipeline after toCSVTransform
to do the encryption. The npm package openpgp
supports streams but I'm not sure how to work it into the pipeline.
From the openpgp
documentation, here is an example of how to use pass a readable stream to the openpgp.encrypt
function:
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('Hello, world!');
controller.close();
}
});
const encrypted = await openpgp.encrypt({
message: await openpgp.createMessage({ text: readableStream }), // input as Message object
encryptionKeys: publicKey,
signingKeys: privateKey // optional
});
All examples I've seen simply pass the readable stream to the encrypt function. But I'm required transform the data the data prior to sending it to s3.
Is there a way for me to pass the toCSVTransform
stream to the openpgp.encrypt
method?
It seems like I want to compose the readable dbStream
and the transform stream, toCSVTransform
into a single stream and pass that to the openpgp.encrypt function.
I notice node.js has a stream.compose method but it is currently only experimental so its not really an option.
**** Edit: possible solution It looks like I can use pipe() to transform the stream before passing it to the openpgp.encrypt method:
const encrypted = await openpgp.encrypt({
message: await openpgp.createMessage({ text: dbStream.pipe(toCSVTransform) }), // input as Message object
encryptionKeys: publicKey,
signingKeys: privateKey // optional
});