3

I'm using the below pipeline to stream data from Aurora, transform it to csv, and send it to S3.

Readable knex stream:

const getQueryStream = (organizationId) => 
  db.select('*')
    .from('users')
    .where('organization_id', organizationId)
    .stream();

Transforming the data:

const toCSVTransform = (fields) => new stream.Transform({
  objectMode: true,
  transform: (row, encoding, callback) => {
    let rowAsArr = [];
    for(let i = 0; i < fields.length; i++) {
      rowAsArr.push(row[fields[i]]);
    }
    callback(null, `${rowAsArr.join(',')}\n`);
  }
});

Pipeline:

stream.pipeline(
    dbStream,
    toCSVTransform(['first_name', 'last_name', 'email']),
    s3WritableStream,
    (err) => {
        if (err) {
            console.error('Pipeline failed.', err)
        } else {
            console.log('Pipeline succeeded.')
        }
    }
)

This works as it is, but we've been given an additional requirement to encrypt the file using PGP encryption. My thought was to have an additional step in the pipeline after toCSVTransform to do the encryption. The npm package openpgp supports streams but I'm not sure how to work it into the pipeline.

From the openpgp documentation, here is an example of how to use pass a readable stream to the openpgp.encrypt function:

const readableStream = new ReadableStream({
    start(controller) {
        controller.enqueue('Hello, world!');
        controller.close();
    }
});

const encrypted = await openpgp.encrypt({
    message: await openpgp.createMessage({ text: readableStream }), // input as Message object
    encryptionKeys: publicKey,
    signingKeys: privateKey // optional
});

All examples I've seen simply pass the readable stream to the encrypt function. But I'm required transform the data the data prior to sending it to s3.

Is there a way for me to pass the toCSVTransform stream to the openpgp.encrypt method?

It seems like I want to compose the readable dbStream and the transform stream, toCSVTransform into a single stream and pass that to the openpgp.encrypt function.

I notice node.js has a stream.compose method but it is currently only experimental so its not really an option.

**** Edit: possible solution It looks like I can use pipe() to transform the stream before passing it to the openpgp.encrypt method:

const encrypted = await openpgp.encrypt({
    message: await openpgp.createMessage({ text: dbStream.pipe(toCSVTransform) }), // input as Message object
    encryptionKeys: publicKey,
    signingKeys: privateKey // optional
});
navig8tr
  • 1,724
  • 8
  • 31
  • 69

1 Answers1

1

What you have is roughly correct, but encrypted will be a Stream.

This will work:

const encryptedPrivateKey = await openpgp.readPrivateKey({armoredKey});
const signingKey = await openpgp.decryptKey({
  privateKey: encryptedPrivateKey,
  passphrase,
})

const encrypt = async (encryptionKeys, signingKeys, readableStream) => await openpgp.encrypt({
  message: await openpgp.createMessage({text: readableStream}),
  encryptionKeys,
  signingKeys,
});

stream.pipeline(
    await encrypt(encryptionKey, signingKey, stream.pipeline(
      dbStream,
      toCSVTransform(['first_name', 'last_name', 'email']),
    )),
    s3WritableStream,
    (err) => {
        if (err) {
            console.error('Pipeline failed.', err)
        } else {
            console.log('Pipeline succeeded.')
        }
    }
)

Unfortunately, there's no (simple) way to wrap openpgp such that it can be inserted directly in a pipeline.

If you're ok with symmetric encryption, then a cleaner solution would be to use crypto:

const encrypter = crypto.createCipheriv(algo, key, iv)

stream.pipeline(
    dbStream,
    toCSVTransform(['first_name', 'last_name', 'email']),
    encrypter,
    s3WritableStream,
    (err) => {
        if (err) {
            console.error('Pipeline failed.', err)
        } else {
            console.log('Pipeline succeeded.')
        }
    }
)
Codebling
  • 10,764
  • 2
  • 38
  • 66
  • Unfortunately, pgp is required. Also, these files will need to be signed and there does not seem to be a simple way to do that either. The `await encrypt` pipeline step would need to be nested inside an `await sign` step. Maybe this should be another question, but is there a simpler way to do both encrypt and sign? – navig8tr Dec 25 '21 at 04:20
  • 1
    You can do both at the same time! Just provide `signingKeys` as an option to `encrypt()`. Example updated. – Codebling Dec 25 '21 at 07:24
  • This seems to work but Intellij inspection reports a signature mismatch on the `await.encrypt` line. `Argument type WebStream | NodeStream | string is not assignable to parameter type NodeJS.ReadableStream   Type string is not assignable to type NodeJS.ReadableStream`. The warning doesn't make sense to me though because openpgp.encrypt accepts a NodeStream – navig8tr Dec 26 '21 at 01:09
  • @navig8tr I think we're on the same page, but to avoid ambiguity, `openpgp.encrypt()` does not accept a `Stream`, `openpgp.createMessage()` does, [in the `text` value of `options`](https://docs.openpgpjs.org/global.html#createMessage). But it also accepts `string`, so this message seems incorrect if it's When in doubt, check with Typescript. – Codebling Dec 26 '21 at 09:26
  • You removed the line in your original code that declares a variable named `readableStream`, right? Could the message be concerning a different line of code? – Codebling Dec 26 '21 at 09:27