112

I wrote a pretty simple function that downloads an image from a given URL, resize it and upload to S3 (using 'gm' and 'knox'), I have no idea if I'm doing the reading of a stream to a buffer correctly. (everything is working, but is it the correct way?)

also, I want to understand something about the event loop, how do I know that one invocation of the function won't leak anything or change the 'buf' variable to another already running invocation (or this scenario is impossible because the callbacks are anonymous functions?)

var http = require('http');
var https = require('https');
var s3 = require('./s3');
var gm = require('gm');

module.exports.processImageUrl = function(imageUrl, filename, callback) {
var client = http;
if (imageUrl.substr(0, 5) == 'https') { client = https; }

client.get(imageUrl, function(res) {
    if (res.statusCode != 200) {
        return callback(new Error('HTTP Response code ' + res.statusCode));
    }

    gm(res)
        .geometry(1024, 768, '>')
        .stream('jpg', function(err, stdout, stderr) {
            if (!err) {
                var buf = new Buffer(0);
                stdout.on('data', function(d) {
                    buf = Buffer.concat([buf, d]);
                });

                stdout.on('end', function() {
                    var headers = {
                        'Content-Length': buf.length
                        , 'Content-Type': 'Image/jpeg'
                        , 'x-amz-acl': 'public-read'
                    };

                    s3.putBuffer(buf, '/img/d/' + filename + '.jpg', headers, function(err, res) {
                        if(err) {
                            return callback(err);
                        } else {
                            return callback(null, res.client._httpMessage.url);
                        }
                    });
                });
            } else {
                callback(err);
            }
        });
    }).on('error', function(err) {
        callback(err);
    });
};
Gal Ben-Haim
  • 17,433
  • 22
  • 78
  • 131

11 Answers11

109

Overall I don't see anything that would break in your code.

Two suggestions:

The way you are combining Buffer objects is a suboptimal because it has to copy all the pre-existing data on every 'data' event. It would be better to put the chunks in an array and concat them all at the end.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
})

For performance, I would look into if the S3 library you are using supports streams. Ideally you wouldn't need to create one large buffer at all, and instead just pass the stdout stream directly to the S3 library.

As for the second part of your question, that isn't possible. When a function is called, it is allocated its own private context, and everything defined inside of that will only be accessible from other items defined inside that function.

Update

Dumping the file to the filesystem would probably mean less memory usage per request, but file IO can be pretty slow so it might not be worth it. I'd say that you shouldn't optimize too much until you can profile and stress-test this function. If the garbage collector is doing its job you may be overoptimizing.

With all that said, there are better ways anyway, so don't use files. Since all you want is the length, you can calculate that without needing to append all of the buffers together, so then you don't need to allocate a new Buffer at all.

var pause_stream = require('pause-stream');

// Your other code.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var contentLength = bufs.reduce(function(sum, buf){
    return sum + buf.length;
  }, 0);

  // Create a stream that will emit your chunks when resumed.
  var stream = pause_stream();
  stream.pause();
  while (bufs.length) stream.write(bufs.shift());
  stream.end();

  var headers = {
      'Content-Length': contentLength,
      // ...
  };

  s3.putStream(stream, ....);
Christian Fritz
  • 20,641
  • 3
  • 42
  • 71
loganfsmyth
  • 156,129
  • 30
  • 331
  • 251
  • it supports streams, but I need to know the Content-Length for the S3 headers and its impossible with streams – Gal Ben-Haim Jan 11 '13 at 00:14
  • btw - what about the second part of the question ? – Gal Ben-Haim Jan 11 '13 at 00:15
  • 1
    is it a better practice to pipe the stream from 'gm' to a file and then open a stream from that file and upload to S3, using the file size as Content-Length ? as far as I understand this eliminates loading the entire file to memory like I'm doing now – Gal Ben-Haim Jan 11 '13 at 07:53
  • just want to mention that the `bufs.pop()` call should be `bufs.unshift()`, or even easier just replace the entire while loop with a simple for loop. – Erhhung Mar 07 '16 at 22:07
  • on the on('data') you can just do bytes += data.length instead of reducing the array at the end. – Bergur Feb 07 '18 at 00:44
  • 1
    @Bergur True, but then you have to maintain two separate accumulator variables. I prefer maintaining the single one and calculating the length later. I'm not convinced it would make an appreciable difference in performance or anything. – loganfsmyth Feb 07 '18 at 01:01
  • Great...Its really helped @oganfsmyth – Dibish Mar 08 '19 at 06:22
66

Javascript snippet

function stream2buffer(stream) {

    return new Promise((resolve, reject) => {
        
        const _buf = [];

        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));

    });
} 

Typescript snippet

async function stream2buffer(stream: Stream): Promise<Buffer> {

    return new Promise < Buffer > ((resolve, reject) => {
        
        const _buf = Array < any > ();

        stream.on("data", chunk => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", err => reject(`error converting stream - ${err}`));

    });
} 
Mike
  • 14,010
  • 29
  • 101
  • 161
bsorrentino
  • 1,413
  • 11
  • 19
  • 3
    this works incredibly well... this is the MVP right here – Jan Jun 05 '21 at 19:57
  • Actually for me JS version of `stream2buffer()` doesn't return a proper value. – Mike Jun 30 '21 at 20:10
  • 1
    Hi @MikeB. as you see the code is very simple (also to debug) Could you give us mode details concerning : "doesn't return a proper value" – bsorrentino Jul 01 '21 at 15:50
  • @bsorrentino, I think the problem was with the way I return the value. In my case `const pdfBuffered = \`data:application/pdf;base64, ${Buffer.concat(chunks).toString("base64")}\`;` worked OK. So, not just `Buffer.concat(_buf))` but `Buffer.concat(chunks).toString("base64")`. – Mike Jul 01 '21 at 18:09
  • Excellent. Needed to convert a graphql-upload ReadStream to a buffer to process an image. This worked wonderfully! – Nelson Fleig Jun 16 '22 at 06:25
  • Unbelievable that this isn't in the stdlib – jameshfisher Jun 18 '22 at 20:22
22

Note: this solely answers "How to read a stream into a buffer?" and ignores the context of the original question.

ES2018 Answer

Since Node 11.14.0, readable streams support async iterators.

const buffers = [];

// node.js readable streams implement the async iterator protocol
for await (const data of readableStream) {
  buffers.push(data);
}

const finalBuffer = Buffer.concat(buffers);

Bonus: In the future, this could get better with the stage 2 Array.fromAsync proposal.

//  DOES NOT WORK (yet!)
const finalBuffer = Buffer.concat(await Array.fromAsync(readableStream));
Rico Kahler
  • 17,616
  • 11
  • 59
  • 85
  • Would using the iterator be preferred over the eventing proposed in the other answers? I had actually gone the iterator route before reviewing other suggestions, and then found this SO question where most of the answers suggest going the evening route. What do you think? – Metro Smurf Oct 30 '22 at 20:11
  • yeah there's nothing about this approach that would be worse than the events. most of those answer were written prior to this feature existing. i'd say this is very idiomatic to the language however many JS devs still don't know about it so the best approach is probably one your team understands the most – Rico Kahler Nov 01 '22 at 16:31
  • 1
    function dies on / after first chunk, no response, no error... what could it be? any help will be appreciated – ISAE Dec 28 '22 at 20:43
10

You can easily do this using node-fetch if you are pulling from http(s) URIs.

From the readme:

fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
    .then(res => res.buffer())
    .then(buffer => console.log)
oxygen
  • 5,891
  • 6
  • 37
  • 69
6

You can convert your readable stream to a buffer and integrate it in your code in an asynchronous way like this.

async streamToBuffer (stream) {
    return new Promise((resolve, reject) => {
      const data = [];

      stream.on('data', (chunk) => {
        data.push(chunk);
      });

      stream.on('end', () => {
        resolve(Buffer.concat(data))
      })

      stream.on('error', (err) => {
        reject(err)
      })
   
    })
  }

the usage would be as simple as:

 // usage
  const myStream // your stream
  const buffer = await streamToBuffer(myStream) // this is a buffer
SLIMANI Mohammed
  • 407
  • 7
  • 10
  • Doesn't work with `process.stdin` on Windows - if no input has been piped into the command, neither `end` nor `error` appears to fire. – mindplay.dk Sep 15 '21 at 12:08
5

I suggest loganfsmyths method, using an array to hold the data.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}

IN my current working example, i am working with GRIDfs and npm's Jimp.

   var bucket = new GridFSBucket(getDBReference(), { bucketName: 'images' } );
    var dwnldStream = bucket.openDownloadStream(info[0]._id);// original size
  dwnldStream.on('data', function(chunk) {
       data.push(chunk);
    });
  dwnldStream.on('end', function() {
    var buff =Buffer.concat(data);
    console.log("buffer: ", buff);
       jimp.read(buff)
.then(image => {
         console.log("read the image!");
         IMAGE_SIZES.forEach( (size)=>{
         resize(image,size);
         });
});

I did some other research

with a string method but that did not work, per haps because i was reading from an image file, but the array method did work.

const DISCLAIMER = "DONT DO THIS";
var data = "";
stdout.on('data', function(d){ 
           bufs+=d; 
         });
stdout.on('end', function(){
          var buf = Buffer.from(bufs);
          //// do work with the buffer here

          });

When i did the string method i got this error from npm jimp

buffer:  <Buffer 00 00 00 00 00>
{ Error: Could not find MIME for Buffer <null>

basically i think the type coersion from binary to string didnt work so well.

1

I suggest to have array of buffers and concat to resulting buffer only once at the end. Its easy to do manually, or one could use node-buffers

Andrey Sidorov
  • 24,905
  • 4
  • 62
  • 75
1

I just want to post my solution. Previous answers was pretty helpful for my research. I use length-stream to get the size of the stream, but the problem here is that the callback is fired near the end of the stream, so i also use stream-cache to cache the stream and pipe it to res object once i know the content-length. In case on an error,

var StreamCache = require('stream-cache');
var lengthStream = require('length-stream');

var _streamFile = function(res , stream , cb){
    var cache = new StreamCache();

    var lstream = lengthStream(function(length) {
        res.header("Content-Length", length);
        cache.pipe(res);
    });

    stream.on('error', function(err){
        return cb(err);
    });

    stream.on('end', function(){
        return cb(null , true);
    });

    return stream.pipe(lstream).pipe(cache);
}
1

in ts, [].push(bufferPart) is not compatible;

so:

getBufferFromStream(stream: Part | null): Promise<Buffer> {
    if (!stream) {
        throw 'FILE_STREAM_EMPTY';
    }
    return new Promise(
        (r, j) => {
            let buffer = Buffer.from([]);
            stream.on('data', buf => {
               buffer = Buffer.concat([buffer, buf]);
            });
            stream.on('end', () => r(buffer));
            stream.on('error', j);
        }
    );
}
afcfzf
  • 29
  • 5
1

You can do this by:

async function toBuffer(stream: ReadableStream<Uint8Array>) {
  const list = []
  const reader = stream.getReader()
  while (true) {
    const { value, done } = await reader.read()
    if (value)
      list.push(value)
    if (done)
      break
  }
  return Buffer.concat(list)
}

or using buffer consumer

const buf = buffer(stream)
shtse8
  • 1,092
  • 12
  • 20
  • Are you sure this works? I don't think this will read the entire stream. "The read() method of the ReadableStreamDefaultReader interface returns a Promise providing access to the *next chunk* in the stream's internal queue." https://stackoverflow.com/questions/14269233/node-js-how-to-read-a-stream-into-a-buffer – killdash9 Nov 17 '22 at 00:18
  • @killdash9 I have updated my answer to concat all chunks. Thanks. – shtse8 Nov 17 '22 at 09:28
0

You can check the "content-length" header at res.headers. It will give you the length of the content you will receive (how many bytes of data it will send)

faBri_CI-o
  • 53
  • 1
  • 6