0

I am trying to download some PDFs from internet into my Amazon S3 bucket, so far i download the files on my server and then upload them from my server to S3 Bucket but i was curious if i can upload them while downloading them as stream.

private async download(url: string, path: string): Promise<void> {
    const response = await fetch(url);
    const fileStream = createWriteStream(path);
    await new Promise((resolve, reject) => {
      response.body.pipe(fileStream);
      response.body.on('error', reject);
      fileStream.on('finish', resolve);
    });
  }

and this is my upload file after i downloaded it

public async upload(path: string, name: string): Promise<string> {
    const url = 'documents/${name}.pdf';
    const params = {
      Body: createReadStream(path),
      Bucket: AWS_S3_BUCKET_NAME,
      Key: url
    }

    const data = await s3.putObject(params).promise().then(data => { console.log(data); return url; }, err => { console.log(err); return err; });

    return data;
  }

I am looking for a way to merge these 2 functions into one and return the S3 bucket reply after finished or throw an error if download or upload gave an error.

Also i wanted to ask if it is possible to call this function multiple times in parallel and if it's possible, how many times is safe to not break the server.

Thank you in advance, Daniel!

Pacuraru Daniel
  • 1,207
  • 9
  • 30
  • 56
  • Related: https://stackoverflow.com/questions/37336050/pipe-a-stream-to-s3-upload – jarmod May 09 '22 at 13:18
  • I need similar thing... If you know now how to pipe directly, I will appreciate to learn how to do it. – mlev Jun 16 '22 at 06:27

1 Answers1

0

Yes, you can. Working example for downloading and uploading at the same time using multipart upload for node environment:

import {
  AbortMultipartUploadCommandOutput,
  CompleteMultipartUploadCommandOutput,
  S3Client,
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import Axios, { AxiosResponse } from 'axios';
import mime from 'mime-types';
import { Logger } from 'pino';
import { PassThrough } from 'stream';

export class S3HandlerClient {
  private readonly PART_SIZE = 1024 * 1024 * 5; // 5 MB
  private readonly CONCURRENCY = 4;

  private readonly logger: Logger;
  private readonly client: S3Client;

  constructor(props: { logger: Logger; sdkClient: S3Client }) {
    this.logger = props.logger;
    this.client = props.sdkClient;
  }

  async uploadVideo(props: {
    input: {
      videoUrl: string;
    };
    output: {
      bucketName: string;
      fileNameWithoutExtension: string;
    };
  }): Promise<string> {
    try {
      const inputStream = await this.getInputStream({ videoUrl: props.input.videoUrl });
      const outputFileRelativePath = this.getFileNameWithExtension(
        props.output.fileNameWithoutExtension,
        inputStream,
      );
      await this.getOutputStream({
        inputStream,
        output: {
          ...props.output,
          key: outputFileRelativePath,
        },
      });
      return `s3://${props.output.bucketName}/${outputFileRelativePath}`;
    } catch (error) {
      this.logger.error({ error }, 'Error occurred while uploading/downloading file.');
      throw error;
    }
  }

  private getFileNameWithExtension(fileName: string, inputStream: AxiosResponse) {
    this.logger.info({ headers: inputStream.headers });
    return `${fileName}.${this.getFileExtensionFromContentType(
      inputStream.headers['content-type'],
    )}`;
  }

  private getFileExtensionFromContentType(contentType: string): string {
    const extension = mime.extension(contentType);
    if (extension) {
      return extension;
    } else {
      throw new Error(`Failed to get extension from 'Content-Type' header': ${contentType}.`);
    }
  }

  private async getInputStream(props: { videoUrl: string }): Promise<AxiosResponse> {
    this.logger.info({ videoUrl: props.videoUrl }, 'Initiating download');
    const response = await Axios({
      method: 'get',
      url: props.videoUrl,
      responseType: 'stream',
    });
    this.logger.info({ headers: response.headers }, 'Input stream HTTP headers');

    return response;
  }

  private async getOutputStream(props: {
    inputStream: AxiosResponse;
    output: {
      bucketName: string;
      key: string;
    };
  }): Promise<CompleteMultipartUploadCommandOutput | AbortMultipartUploadCommandOutput> {
    this.logger.info({ output: props.output }, 'Initiating upload');

    const output = props.output;
    const passThrough = new PassThrough();
    const upload = new Upload({
      client: this.client,
      params: { Bucket: output.bucketName, Key: output.key, Body: passThrough },
      queueSize: this.CONCURRENCY,
      partSize: this.PART_SIZE,
      leavePartsOnError: false,
    });

    props.inputStream.data.pipe(passThrough);

    if (this.logger.isLevelEnabled('debug')) {
      upload.on('httpUploadProgress', (progress) => {
        this.logger.debug({ progress }, 'Upload progress');
      });
    }

    return await upload.done();
  }
}

This is how you can initialize it:

import { S3Client } from '@aws-sdk/client-s3';
import pino from 'pino';


const sdkClient = new S3Client({ region: 'us-west-2' });
const client = new S3HandlerClient({ logger: pino(), sdkClient });

Example dependencies:

{
...
  "dependencies": {
    "@aws-sdk/client-s3": "^3.100.0",
    "@aws-sdk/lib-storage": "^3.100.0",
    "axios": "^0.27.2",
    "mime-types": "^2.1.35",
    "pino": "^7.11.0",
    "pino-lambda": "^4.0.0",
    "streaming-s3": "^0.4.5",
  },
  "devDependencies": {
    "@types/aws-lambda": "^8.10.97",
    "@types/mime-types": "^2.1.1",
    "@types/pino": "^7.0.5",
  }
}
Žilvinas Rudžionis
  • 1,954
  • 20
  • 28