3

I have a stream i am trying to submit the same stream to two different destinations. The first destination is to AWS S3, the second destination is to some other backend via a http request.

const document = fs.createReadStream(process.cwd() + "/test/resources/" + "id/document.jpg");

const s3Response = await submitToS3(document);

const backendResponse = await submitToBackend(document);

From what i understand a stream can only be read once. How can i send the same stream to two different destinations.

I thought about cloning the stream but simply creating a new variable and assigning the stream to that variable does not work.

Kay
  • 17,906
  • 63
  • 162
  • 270

3 Answers3

3

you can check this npm module out: https://www.npmjs.com/package/readable-stream-clone

npm install readable-stream-clone
const fs = require("fs");
const ReadableStreamClone = require("readable-stream-clone");
 
const readStream = fs.createReadStream('text.txt');
 
const readStream1 = new ReadableStreamClone(readStream);
const readStream2 = new ReadableStreamClone(readStream);
 
const writeStream1 = fs.createWriteStream('sample1.txt');
const writeStream2 = fs.createWriteStream('sample2.txt');
 
readStream1.pipe(writeStream1)
readStream2.pipe(writeStream2)
levansuper
  • 741
  • 1
  • 7
  • 13
0

Raghavendra's answer suggests a good potential direction. You can combine multiple pipes from this answer with S3 pipe implementation from this answer.

For the submitToBackend part, not sure exactly what your implementation looks like, but assuming you can pipe to an HTTP request of some sort...

Example:

var fs = require("fs");
const request = require("request");
const AWS = require("aws-sdk");
const s3 = new AWS.S3();

const rs = fs.createReadStream(process.cwd() + "/test/resources/" + "id/document.jpg");

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}

rs.pipe(uploadFromStream(s3));

// Just guessing at your submitToBackend implementation:
const backendWs = request.post("http://example.com/docs");

// However it works, if you can get to a stream.Writable, you can now pipe the same stream.Readable:
rs.pipe(backendWs);
Will
  • 6,601
  • 3
  • 31
  • 42
  • 1
    submitToBackend is an async function i need to wait for a response – Kay Feb 25 '20 at 09:45
  • I have updated my question to reflect that i the functions are promises. – Kay Feb 25 '20 at 10:11
  • Ah, thanks, I see, but [createReadStream](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options) itself does not return a promise but rather an instance of `fs.ReadStream`. Maybe that's the only problem? – Will Feb 25 '20 at 14:25
  • No i have a stream coming, createReadStream is correct. The stream then gets passed to s3 and http request – Kay Feb 25 '20 at 15:59
  • I'm confused. Your code snippet has `await fs.createReadStream`, but you and I are both agreeing that `createReadStream` returns a stream, not a promise. But `await` is for promises (async functions). I think that could be the source of your problem. But perhaps I'm missing something. – Will Feb 25 '20 at 16:18
  • 1
    Yeah sorry i shouldint need to await fs.createReadStream, ive updated that – Kay Feb 25 '20 at 16:35
-3
var fs = require("fs");
var ReadableStreamClone = require("readable_stream");

var readStream = fs.createReadStream('text1.txt');

var readStream1 = new ReadableStreamClone(readStream);
var readStream2 = new ReadableStreamClone(readStream);

var writeStream1 = fs.createWriteStream('testsample1.txt');
var writeStream2 = fs.createWriteStream('testsample2.txt');

readStream1.pipe(writeStream1)
readStream2.pipe(writeStream2)
GRVPrasad
  • 1,228
  • 1
  • 9
  • 24