1

This question is the exact reverse of converting a ReadableStream into a ReadStream.

With the advent of non-Node.js runtimes such as Deno or the "Edge runtime" in Next.js, it can be useful to convert a Node.js specific ReadStream into a generic ReadableStream.

This is for instance useful to send files from a Next.js route handler, see this discussion on Next.js GitHub.

I've drafted a piece of code like so:

const downloadStream = fs.createReadStream(zipFilePath);
    const readStream = new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return downloadStream.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }
            // Enqueue the next data chunk into our target stream
            controller.enqueue(value);
            return pump();
          });
        }
      },
    });

I am in the process of testing it.

Edit: the problem with this first draft is that stream.Readable read() method doesn't return a promise, as mentioned by @Mahesh in the comments.

Here is a second try:

    const downloadStream = fs.createReadStream(zipFilePath);
    const readStream = new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          const buf = downloadStream.read() as Buffer
          if (buf === null) {
            controller.close();
            return;
          }
          controller.enqueue(buf.toString());
          return pump();
        }
      },
    });

It gives me a null buffer immediately despite the file weighing 344 bytes. When I call isPaused(), the stream doesn't seem to be paused. Calling pause() doesn't fix my issue, neither adding an explicit size of 1 byte to read().

I also get a weird error from Next.js:

- error Error: aborted
    at connResetException (node:internal/errors:711:14)
    at Socket.socketCloseListener (node:_http_client:454:19)
    at Socket.emit (node:events:525:35)
    at TCP.<anonymous> (node:net:313:12) {
  code: 'ECONNRESET'
}

Are there simpler solutions, syntax-wise?

Eric Burel
  • 3,790
  • 2
  • 35
  • 56
  • 1
    Its been long but I think the Node.js `ReadableStream` `read` method does not return a Promise, but the chunk of data directly. To get chunks of data asynchronously, you can use the readable event and the read method in combination – Mahesh Jul 25 '23 at 13:13
  • You are right, my current code doesn't do anything because of this. I am trying to figure the right syntax. Some insights: it seem all stream start in "paused mode", in this mode I can call "read()" and check if the response is null or not to see if there is more data. Docs are here: https://nodejs.org/api/stream.html#class-streamreadable – Eric Burel Jul 25 '23 at 13:34

1 Answers1

0

Managed to find a working syntax but still lacking some details.

  1. We want to be able to read a file using an imperative syntax, instead of relying on the traditional "data" event.
/**
 * From https://github.com/MattMorgis/async-stream-generator
 */
async function* nodeStreamToIterator(stream) {
    for await (const chunk of stream) {
        yield chunk;
    }
}

I am not familiar with generators so I am not sure of:

  • why we can apply the "of" operator to a Node.js ReadableStream?
  • what for await means here?

But at least this syntax let's us consume the stream in loops.

  1. Now we want to convert the iterator into a web platform ReadStream.
/**
 * Taken from Next.js doc
 * https://nextjs.org/docs/app/building-your-application/routing/router-handlers#streaming
 * Itself taken from mozilla doc
 * https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_async_iterator_to_stream
 * @param {*} iterator 
 * @returns {ReadableStream}
 */
function iteratorToStream(iterator) {
    return new ReadableStream({
        async pull(controller) {
            const { value, done } = await iterator.next()

            if (done) {
                controller.close()
            } else {

                controller.enqueue(new Uint8Array(value))
            }
        },
    })
}

Notice the "Uint8Array": this doesn't seem to be needed in all scenarios, but encoding may be required on some platforms, I needed this conversion in Next.js. See this discussion on Next.js github.

Finally we can use this stream in a Response just to see how it works:

// highWaterMark affects the chunk size, here I use a small size to simulate many chunks
const nodeStream = fs.createReadStream("./.gitignore", { highWaterMark: 8 })
const iterator = nodeStreamToIterator(nodeStream)
const webStream = iteratorToStream(iterator)

const res = new Response(webStream)
const blob = await res.blob()
console.log(await blob.text())
Eric Burel
  • 3,790
  • 2
  • 35
  • 56
  • 2
    In short, `for..of` construction can be applied to any object that has [iterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/iterator) and `for await..of` respectively can be applied to any [async iterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator). Streams are [compatible](https://nodejs.org/api/stream.html#streams-compatibility-with-async-generators-and-async-iterators) with async iterators that's why you can use `for await`. – Jaood_xD Jul 27 '23 at 14:06