2

I am trying to stream data from a large csv file into readline. I tried just piping the readStream from s3 into the readline input, however I faced an error with S3 only allowing a connection to stay open for a certain amount of time.

I am creating the stream from s3 like so:

import * as AWS from 'aws-sdk';
import {s3Env} from '../config';

export default async function createAWSStream(): Promise<SmartStream> {
    return new Promise((resolve, reject) => {
        const params = {
            Bucket: s3Env.bucket,
            Key: s3Env.key
        };

        try {
            const s3 = new AWS.S3({
                accessKeyId: s3Env.accessKey,
                secretAccessKey: s3Env.secret
            });

            s3.headObject(bucketParams, (error, data) => {
                if (error) {
                    throw error
                };

                const stream = s3.getObject(params).createReadStream();

                resolve(stream);
            })
        } catch (error) {
            reject(error);
        }
    })
}

Then I am piping it into readline:

import * as readline from 'readline';
import createAWSStream from './createAWSStream';

export const readCSVFile = async function(): Promise<void> {
  const rStream = await createAWSStream();

  const lineReader = readline.createInterface({
    input: rStream
  });

  for await (const line of lineReader) {
    // process line
  }
}

I found that the timeout for s3 connections was set at 120000ms (2 min). I tried simply raising the timeout, however I ran into more timeout issues from the HTTPS connection.

How can I stream data from AWS S3 the right way without setting a bunch of timeouts to some extremely large timeframe?

about14sheep
  • 1,813
  • 1
  • 11
  • 18

1 Answers1

2

I was able to work in a solution for this using the AWS S3 Range property and creating a custom readable stream with NodeJS Stream API.

By using this "smart stream" I was able to grab data in chunks in separate requests to the S3 instance. By grabbing the data in chunks, I avoided any timeout errors as well as creating a more efficient stream. The NodeJS Readable Super class handles the buffer so as to not overload the input to readline. It also automatically handles the pausing and resuming of the stream.

This class made it possible to stream large files from AWS S3 very easily:

import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";

export class SmartStream extends Readable {
    _currentCursorPosition = 0; // Holds the current starting position for our range queries
    _s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
    _maxContentLength: number; // Total number of bites in the file
    _s3: S3; // AWS.S3 instance
    _s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method

    constructor(
        parameters: S3.GetObjectRequest,
        s3: S3,
        maxLength: number,
        // You can pass any ReadableStream options to the NodeJS Readable super class here
        // For this example we wont use this, however I left it in to be more robust
        nodeReadableStreamOptions?: ReadableOptions
    ) {
        super(nodeReadableStreamOptions);
        this._maxContentLength = maxLength;
        this._s3 = s3;
        this._s3StreamParams = parameters;
    }

    _read() {
        if (this._currentCursorPosition > this._maxContentLength) {
            // If the current position is greater than the amount of bytes in the file
            // We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
            this.push(null);
        } else {
            // Calculate the range of bytes we want to grab
            const range = this._currentCursorPosition + this._s3DataRange;
            // If the range is greater than the total number of bytes in the file
            // We adjust the range to grab the remaining bytes of data
            const adjustedRange =
                range < this._maxContentLength ? range : this._maxContentLength;
            // Set the Range property on our s3 stream parameters
            this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
            // Update the current range beginning for the next go
            this._currentCursorPosition = adjustedRange + 1;
            // Grab the range of bytes from the file
            this._s3.getObject(this._s3StreamParams, (error, data) => {
                if (error) {
                    // If we encounter an error grabbing the bytes
                    // We destroy the stream, NodeJS ReadableStream will emit the 'error' event
                    this.destroy(error);
                } else {
                    // We push the data into the stream buffer
                    this.push(data.Body);
                }
            });
        }
    }
}

To work it into the createAWSStream function I simply replaced the line where I created the readStream:

const stream = s3.getObject(params).createReadStream();

To instead create an instance of my SmartStream class passing in the s3 params object, the s3 instance, and the content length of the data.

const stream = new SmartStream(params, s3, data.ContentLength);
about14sheep
  • 1,813
  • 1
  • 11
  • 18
  • Hi do you have a working exemple of this. I'm having issue with 20GB of zipped CSV. – Morlo Mbakop Feb 18 '22 at 18:44
  • @MorloMbakop This solution was actually made for csv files so it should work pretty well. I also have this [repo on github](https://github.com/about14sheep/awsstreaming), however it was made to stream HD video, its exactly the same under the hood though! – about14sheep Feb 18 '22 at 19:55
  • @about14sheep This is nice approach. In Safari we need to send updated header "Content-Range". How can we do that? I tried adding transform stream and event listener on Content stream before doing `.pipe(res)` but it didn't work. Please suggest way achieve this. – Manish Mahajan Oct 12 '22 at 10:11
  • @ManishMahajan Since safari does not allow the entire file to piped into the response (in case you need to grab a previous chunk), this will take a little more working. I have an [npm package for this readstream](https://www.npmjs.com/package/s3-readstream) and updated it to included methods to move the cursor. I think these two methods would be of help to you. If you dont want to download the package you can just [copy paste these new methods](https://github.com/about14sheep/s3-readstream/blob/master/src/S3Readstream.ts#L38-L66). – about14sheep Oct 12 '22 at 17:33