6

Given a function parses incoming streams:

async onData(stream, callback) {
    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    return callback()
}

I'm looking for a simple and safe way to 'clone' that stream, so I can save it to a file for debugging purposes, without affecting the code. Is this possible?

Same question in fake code: I'm trying to do something like this. Obviously, this is a made up example and doesn't work.

const fs = require('fs')
const wstream = fs.createWriteStream('debug.log')

async onData(stream, callback) {
    const debugStream = stream.clone(stream) // Fake code
    wstream.write(debugStream)

    const parsed = await simpleParser(stream)

    // Code handling parsed stream here
    // ...

    wstream.end()

    return callback()
}
Redsandro
  • 11,060
  • 13
  • 76
  • 106
  • why do you want to clone the stream , since you can still read from it again – 0.sh Jan 24 '19 at 14:47
  • @0.sh efficiency. – Redsandro Jan 24 '19 at 14:53
  • if you are not calling `stream.close()` then there is no need of clonning the stream – 0.sh Jan 24 '19 at 15:01
  • @0.sh Is it really that simple? I thought I would need something like [`cloneable-readable`](https://github.com/mcollina/cloneable-readable) (which I did not include in my answer to prevent tainting the answers I'll get) – Redsandro Jan 24 '19 at 15:09
  • @0.sh actaully you can't read from same stream many times, as fast as first read will be finished, stream will close and all other reads will be incomplete – Ivan Cherviakov Dec 02 '19 at 17:00
  • https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets - I tried this answer and it worked for me. – Ivan Cherviakov Dec 02 '19 at 17:05

2 Answers2

11

No you can't clone a readable stream without consuming. However, you can pipe it twice, one for creating file and the other for 'clone'.

Code is below:

let Readable = require('stream').Readable;
var stream = require('stream')

var s = new Readable()
s.push('beep')
s.push(null)  

var stream1 = s.pipe(new stream.PassThrough())
var stream2 = s.pipe(new stream.PassThrough())

// here use stream1 for creating file, and use stream2 just like s' clone stream
// I just print them out for a quick show
stream1.pipe(process.stdout)
stream2.pipe(process.stdout)
jiajianrong
  • 868
  • 8
  • 24
-1

I've tried to implement the solution provided by @jiajianrong but was struggling to get it work with a createReadStream, because the Readable throws an error when I try to push the createReadStream directly. Like:

s.push(createReadStream())

To solve this issue I have used a helper function to transform the stream into a buffer.

function streamToBuffer (stream: any) {
  const chunks: Buffer[] = []
  return new Promise((resolve, reject) => {
    stream.on('data', (chunk: any) => chunks.push(Buffer.from(chunk)))
    stream.on('error', (err: any) => reject(err))
    stream.on('end', () => resolve(Buffer.concat(chunks)))
  })
}

Below the solution I have found using one pipe to generate a hash of the stream and the other pipe to upload the stream to a cloud storage.

import stream from 'stream'
const Readable = require('stream').Readable

const s = new Readable()
s.push(await streamToBuffer(createReadStream()))
s.push(null)

const fileStreamForHash = s.pipe(new stream.PassThrough())
const fileStreamForUpload = s.pipe(new stream.PassThrough())

// Generating file hash
const fileHash = await getHashFromStream(fileStreamForHash)

// Uploading stream to cloud storage
await BlobStorage.upload(fileName, fileStreamForUpload)

My answer is mostly based on the answer of jiajianrong.

DanielHefti
  • 144
  • 1
  • 9
  • It makes no sense to buffer the stream and then push it to another stream, you could have just piped the stream from `createReadStream` directly into the two downstreams. The reason its throwing an error is you don't push streams into other streams, that is what pipe is for. The `Readable` @jiajianrong put in his example is just that.. an example; it's a dummy stream with something in it to demonstrate.. you already have the stream (whatever you're getting back from `createReadStream`), your `s` is that created read stream. – gabriel.hayes Aug 09 '23 at 16:21
  • @gabriel.hayes - Hi Gabriel, I appreciate your feedback and will give it a try. I will post my results as soon as possible. – DanielHefti Aug 10 '23 at 13:32