101

I need to run two commands in series that need to read data from the same stream. After piping a stream into another the buffer is emptied so i can't read data from that stream again so this doesn't work:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Request complains about this

Error: You cannot pipe after data has been emitted from the response.

and changing the inputStream to fs.createWriteStream yields the same issue of course. I don't want to write into a file but reuse in some way the stream that request produces (or any other for that matter).

Is there a way to reuse a readable stream once it finishes piping? What would be the best way to accomplish something like the above example?

Alexander Mills
  • 90,741
  • 139
  • 482
  • 817
Maroshii
  • 3,937
  • 4
  • 23
  • 29
  • Seems like you are using imagemick. You can pass value like 50% to -scale for scaling. You can also use https://npmjs.org/package/gm – user568109 Oct 24 '13 at 06:45
  • 3
    @user568109 Yeah. That's not the issue here though. It's a more general question... it's imagemagick as it could be any other command/stream – Maroshii Oct 24 '13 at 08:00

7 Answers7

99

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Output:

8
hi user
garajo
  • 736
  • 4
  • 19
user568109
  • 47,225
  • 17
  • 99
  • 123
  • 5
    Used this technique with the Haraka mailserver attachment hooks to pipe the incoming stream into multiple mail account databases. This answer works. –  Dec 05 '13 at 14:53
  • 24
    Note that this technique only works if the spawned command outputs a number of bytes that does not fill the backpressure buffers. you can try and make it fail with a = spawn('head', ['-c', '200K', '/dev/urandom']);. If c is not piped out, at some point, a.stdout will pause piping out. b will drain and never end. – Jerome WAGNER Jul 18 '14 at 13:27
  • Is `a.stdout.pipe(c);` supposed to be `b.stdout.pipe(c);` ? – Daniel Beardsley Aug 15 '14 at 00:32
  • @DanielBeardsley No, it is copying streams. That would never work as you want to listen on b's data events. – user568109 Aug 15 '14 at 13:14
  • 56
    I'm confused, you say that you can't process the same stream twice, but your solution is to.. process the same stream twice (with the PassThrough transform). This seems contradictory. Is this something special about the stdout streams? – B T Sep 08 '14 at 21:37
  • 13
    I tested this and it certainly works. I think its not correct for you to say "you cannot process same [the] stream twice", since that's what you're doing. Your first statements about not being able to pipe a stream after its 'end' is the appropriate reason. – B T Sep 08 '14 at 21:47
  • @BT If by stream you mean flow of data. In the answer's context stream refers to an object. Also the processing as asked in question concerns causal or sequential processing. Specifically OP wanted to calculate size of the stream and **then** use the stream from the beginning. – user568109 Sep 09 '14 at 05:04
  • 3
    Tested this answer with images, and like @Jerome WAGNER said, it works if files are small enough. It did not work for me as request never ended and I got "socket hang up" errors. – Tola Jun 24 '15 at 07:35
  • 2
    I think this is no longer a valid statement: https://github.com/nodejs/readable-stream/blob/master/lib/_stream_readable.js#L508 readable streams can pipe data to multiple writeable streams. – inf3rno Sep 29 '15 at 02:27
  • @inf3rno The same was valid when I answered. The OP was using the stream incorrectly. He was piping after consuming the data. – user568109 Sep 29 '15 at 03:29
  • 2
    @inf3rno You mean https://github.com/nodejs/readable-stream/blob/7bda7cc5c63fbd5cd63a3b08b424b3890771a7fb/lib/_stream_readable.js#L508 . :-) – clacke Apr 06 '16 at 13:23
  • Solution seems fine **but** watch out when operating on buffers - inplace buffer data modifications in b will be visible in c (if data chunk is string then no worry :) ). – fider Feb 28 '18 at 11:38
  • 10
    Don't use this method because it creates issues if the streams are read at different rates. Try this instead https://www.npmjs.com/package/readable-stream-clone worked well for me. – kiwicomb123 Aug 19 '18 at 22:24
  • 1
    @kiwicomb123 nice, this happily worked out in my situation. Sadly I am not quite able to understand what the problem is, and you are not the first person to mention it. Are you able to elaborate? How is the stream reading rate measured? Is it about the actual time in seconds it takes to process stream data? The first stream is already finished by the time the final pipe starts reading, which is what confuses me. I guess I don't know something fundamental about how streams work. I've read the NodeJS Stream doc before but it's of little help. Do you know any other more useful resource? – Avius Sep 20 '18 at 20:26
  • 1
    @Avius I think its a race condition. Under the assumption that you can't process the same stream twice, if one fork of the stream is finished processing the stream before the second one even starts then the second one will not work. However, I don't have formal documentation to back this up, I arrived at this conclusion by testing. – kiwicomb123 Sep 21 '18 at 02:21
  • 1
    I can confirm that @kiwicomb123's suggestion also works in my situation. I've tried all kinds of things before (e.g. https://www.npmjs.com/package/cloneable-readable) to no avail. Here's my version of the class, updated to modern JS: https://github.com/ditojs/dito/blob/master/packages/server/src/storage/CloneableReadable.js – Jürg Lehni Mar 25 '19 at 20:32
  • For anybody doubting this, this is the suggested method of piping to two streams by the actual NodeJS team since 2011. https://nodejs.org/en/knowledge/advanced/streams/how-to-use-stream-pipe/ They have a sample of streaming a process output to a file as well as `stdout`. `.pipe()` supports multiple entries. https://github.com/nodejs/node/blob/236237829b26c1756104b5a37797a591e8a5aba6/lib/_stream_readable.js#L663 Also, `.pipe()` has backpressuring built-in for convenience. https://nodejs.org/en/docs/guides/backpressuring-in-streams/ If in doubt, never call `.write()` and only use `.pipe()`. – ShortFuse Jun 05 '20 at 22:32
  • 1
    Piping to both a file and stdout may well be a special case since stdout is handled a bit different than other streams since it can't be closed. – Rich Remer Jan 15 '21 at 17:47
12

The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).

The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
artikas
  • 417
  • 3
  • 7
4

For general problem, the following code works fine

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')
Yin
  • 612
  • 7
  • 10
4

You can use this small npm package I created:

readable-stream-clone

With this you can reuse readable streams as many times as you need

levansuper
  • 741
  • 1
  • 7
  • 13
  • 3
    does it suffer from the backpressure problem described [above](https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets#comment38541924_19561718)? What about producing an [empty file](https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets#comment86192248_40874999) from the second pipe? If you can elaborate a little that'd be awesome (to me and to your package reputation :-) ). Thanks in advance! – maganap May 08 '21 at 11:34
  • This lib does correct thing. It is so simple, that the entire source code can be copied here as an answer. This lib won't suffer from "backpressure problem" (see @maganap comment above). This lib will completely ignore backpressure mechanism. – SleepWalker Sep 29 '21 at 09:47
  • There is also more smart alternative implementation: https://github.com/mcollina/cloneable-readable – SleepWalker Sep 29 '21 at 13:50
1

I have a different solution to write to two streams simultaneously, naturally, the time to write will be the addition of the two times, but I use it to respond to a download request, where I want to keep a copy of the downloaded file on my server (actually I use a S3 backup, so I cache the most used files locally to avoid multiple file transfers)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

You can then use this as a regular OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

and pass it to to your method as if it was a response or a fileOutputStream

Zied Hamdi
  • 2,400
  • 1
  • 25
  • 40
1

If you have async operations on the PassThrough streams, the answers posted here won't work. A solution that works for async operations includes buffering the stream content and then creating streams from the buffered result.

  1. To buffer the result you can use concat-stream

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
        return new Promise(function(resolve, reject){
            var gotBuffer = function(buffer){
                resolve(buffer);
            }
            var concatStream = concat(gotBuffer);
            stream.on('error', reject);
            stream.pipe(concatStream);
        });
    }
    
  2. To create streams from the buffer you can use:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
        const stream = new Readable();
        stream.push(buffer);
        stream.push(null);
        return Promise.resolve(stream);
    }
    
Juan
  • 806
  • 1
  • 8
  • 15
-1

What about piping into two or more streams not at the same time ?

For example :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

The above code does not produce any errors but the file2 is empty