125

In https://stackoverflow.com/a/18658613/779159 is an example of how to calculate the md5 of a file using the built-in crypto library and streams.

var fs = require('fs');
var crypto = require('crypto');

// the file you want to get the hash    
var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');

fd.on('end', function() {
    hash.end();
    console.log(hash.read()); // the desired sha1sum
});

// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

But is it possible to convert this to using ES8 async/await instead of using the callback as seen above, but while still keeping the efficiency of using streams?

Felix Kling
  • 795,719
  • 175
  • 1,089
  • 1,143
user779159
  • 9,034
  • 14
  • 59
  • 89
  • 3
    `async/await` is nothing but syntax-level support for promises. If you can put this code inside a promise, then you are done. – Felix Kling Nov 08 '15 at 22:27
  • 1
    Node.js 10.x supports using `for-await-of` to read streams (https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_streams_compatibility_with_async_generators_and_async_iterators) but I think it's not the right concept for your question here. Leaving it as a note for others who might come here facing a situation where it would help. – SimonSimCity Jul 01 '20 at 11:03

7 Answers7

154

The await keyword only works on promises, not on streams. There are ideas to make an extra stream-like data type that would get its own syntax, but those are highly experimental if at all and I won't go into details.

Anyway, your callback is only waiting for the end of the stream, which is a perfect fit for a promise. You'd just have to wrap the stream:

var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');
// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

var end = new Promise(function(resolve, reject) {
    hash.on('end', () => resolve(hash.read()));
    fd.on('error', reject); // or something like that. might need to close `hash`
});

There also exists a helper function to do just that in more recent versions of nodejs - pipeline from the stream/promises module:

import { pipeline } from 'node:stream/promises';
const fd = fs.createReadStream('/some/file/name.txt');
const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

// read all file and pipe it (write it) to the hash object
const end = pipeline(fd, hash);

Now you can await that promise:

(async function() {
    let sha1sum = await end;
    console.log(sha1sum);
}());
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • 5
    Congratulations! Now it is not a right answer, now you can use async cycle with streams http://2ality.com/2018/04/async-iter-nodejs.html. Unfortunately, it is an experimental feature now. – MiF Aug 03 '18 at 10:05
  • 15
    @MiF So you mean the answer is wrong, because it's not "highly experimental" but just "experimental"? :-D – Bergi Aug 03 '18 at 15:53
  • JS move to use promises and async/await in all async cases. It is too strange, that this fiature wasnt released in production. I think, it is a good sintacsys now and will not changed. – MiF Aug 04 '18 at 19:52
  • 1
    Why are you waiting for the ReadStream to `end`? Wouldn't the Promise resolve before the hash function has a chance to perform on the data? Or, at the very least, wouldn't it create a race condition? I would assume you would want to store the value of `fd.pipe(hash)` in a variable and listen for the `finish` event as that would signal that the WriteableStream (hashing) has completed. – adam-beck Dec 18 '18 at 20:01
  • @adam-beck Sounds like you're right. Why I did it this way? Because OP did that in their question as well. Feel free to suggest an edit. – Bergi Dec 18 '18 at 20:32
  • Well I did some digging. And now I'm not so sure: nodejs.org/api/stream.html#stream_events_finish_and_end – adam-beck Dec 19 '18 at 03:33
  • this answer seems a bit old school now. for example, line 1 "async/await only works with promises, not with streams." not true any more. use pipeline from `node:stream/promises`, or for-await while consuming. – rosmcmahon Jan 27 '23 at 07:29
  • @rosmcmahon I've clarified - I would say it is still true that you cannot `await` a stream itself. – Bergi Feb 12 '23 at 01:18
88

If you are using node version >= v10.0.0 then you can use stream.pipeline and util.promisify.

const fs = require('fs');
const crypto = require('crypto');
const util = require('util');
const stream = require('stream');

const pipeline = util.promisify(stream.pipeline);

const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

async function run() {
  await pipeline(
    fs.createReadStream('/some/file/name.txt'),
    hash
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);
shusson
  • 5,542
  • 34
  • 38
  • Should unpipe be called somewhere also? – Fluous Oct 03 '19 at 15:05
  • 2
    I had to dig for hours until I've found this answer. Why is this not included in the Node Streams docs as an example...? – empz Apr 20 '20 at 12:28
  • But it is in the docs. https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback – Costa.S Sep 09 '20 at 20:28
  • 1
    note that there's an issue with pipeline stream flushing in node < 15.5.1 https://github.com/nodejs/node/issues/34274 – krukid Jan 04 '22 at 16:32
71

Node V15 now has a promisfiy pipeline in stream/promises. This is the cleanest and most official way.

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

We all should appreciate how much works it's done here:

  • Capture errors in all the streams.
  • Destroy unfinished streams when error is raised.
  • Only return when the last writable stream is finished.

This pipe thing is one of the most powerful feature Node.JS has. Making it fully async is not easy. Now we have it.

Jason Ching
  • 1,991
  • 1
  • 19
  • 23
18

Something like this works:

for (var res of fetchResponses){ //node-fetch package responses
    const dest = fs.createWriteStream(filePath,{flags:'a'});
    totalBytes += Number(res.headers.get('content-length'));
    await new Promise((resolve, reject) => {
        res.body.pipe(dest);
        res.body.on("error", (err) => {
            reject(err);
        });
        dest.on("finish", function() {
            resolve();
        });
    });         
}
Ronnie Royston
  • 16,778
  • 6
  • 77
  • 91
  • 1
    Great explanation, I used this promise to wrap an https request I am making using NodeJS built in https module. Thank you! – Alex Rindone Apr 08 '20 at 21:15
18

2021 Update:

New example from Node documentation:

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

see https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

Dmitri R117
  • 2,502
  • 23
  • 20
4

I believe it will be helpful for someone:

async function readFile(filename) {
    let records = []
    return new Promise(resolve => {
        fs.createReadStream(filename)
            .on("data", (data) => {
                records.push(data);
            })
            .on("end", () => {
                resolve(records)
            });
    })
}
2

I would comment, but don't have enough reputation.

A WORD OF CAUTION: If you have an application that is passing streams around AND doing async/await, be VERY CAREFUL to connect ALL pipes before you await. You can end up with streams not containing what you thought they did. Here's the minimal example

const { PassThrough } = require('stream');

async function main() {
    const initialStream = new PassThrough();

    const otherStream = new PassThrough();
    const data = [];
    otherStream.on('data', dat => data.push(dat));
    const resultOtherStreamPromise = new Promise(resolve => otherStream.on('end', () => { resolve(Buffer.concat(data)) }));

    const yetAnotherStream = new PassThrough();
    const data2 = [];
    yetAnotherStream.on('data', dat => data2.push(dat));
    const resultYetAnotherStreamPromise = new Promise(resolve => yetAnotherStream.on('end', () => { resolve(Buffer.concat(data2)) }));

    initialStream.pipe(otherStream);
    initialStream.write('some ');

    await Promise.resolve(); // Completely unrelated await

    initialStream.pipe(yetAnotherStream);
    initialStream.end('data');
    const [resultOtherStream, resultYetAnotherStream] = await Promise.all([
        resultOtherStreamPromise,
        resultYetAnotherStreamPromise,
    ]);

    console.log('other stream:', resultOtherStream.toString()); // other stream: some data
    console.log('yet another stream:', resultYetAnotherStream.toString()); // yet another stream: data
}
main();