4

Goal

Download and upload a file to Google Drive purely in-memory using Google Drive APIs Resumable URL.

Challenge / Problem

I want to buffer the file as its being downloaded to memory (not filesystem) and subsequently upload to Google Drive. Google Drive API requires chunks to be a minimum length of 256 * 1024, (262144 bytes).

The process should pass a chunk from the buffer to be uploaded. If the chunk errors, that buffer chunk is retried up to 3 times. If the chunk succeeds, that chunk from the buffer should be cleared, and the process should continue until complete.

Background Efforts / Research (references below)

Most of the articles, examples and packages I've researched and tested have given some insight into streaming, piping and chunking, but use the filesystem as the starting point from a readable stream.

I've tried different approaches with streams like passthrough with highWaterMark and third-party libraries such as request, gaxios, and got which have built in stream/piping support but with no avail on the upload end of the processes.

Meaning, I am not sure how to structure the piping or chunking mechanism, whether with a buffer or pipeline to properly flow to the upload process until completion, and handle the progress and finalizing events in an efficient manner.

Questions

  1. With the code below, how do I appropriately buffer the file and PUT to the google provided URL with the correct Content-Length and Content-Range headers, while having enough buffer space to handle 3 retries?

  2. In terms of handling back-pressure or buffering, is leveraging .cork() and .uncork() an efficient way to manage the buffer flow?

  3. Is there a way to use a Transform stream with highWaterMark and pipeline to manage the buffer efficiently? e.g...

pipeline(
  downloadStream,
  transformStream,
  uploadStream,
  (err) => {
    if (err) {
      reject(err)
    } else {
        resolve(true)
      }
    }
  )

Below is a visual model and the code of what I'm trying to accomplish:

Visual Example

[====================]
File Length (20 MB)

[==========          ]
Download (10 MB)
       
      [======      ]
      Buffer (e.g. 6 MB, size 12 MB)

      [===]
      Upload Chunk (3 MB) => Error? Retry from Buffer (max 3 times)
                          => Success? Empty Buffer => Continue =>
      [===]
      Upload next Chunk (3 MB)

Code

/* 
   Assume resumable_drive_url was already obtained from Google API
   with the proper access token, which already contains the 
   Content-Type and Content-Length in the session. 
*/

transfer(download_url, resumable_drive_url, file_type, file_length) {

    return new Promise((resolve, reject) => {

        let timeout = setTimeout(() => {
            reject(new Error("Transfer timed out."))
        }, 80000)


       // Question #1: Should the passthrough stream 
       // and .on events be declared here?

       const passthrough = new stream.PassThrough({
            highWaterMark: 256 * 1024
       })

       passthrough.on("error", (error) => {
            console.error(`Upload failed: ${error.message}`)
            reject(error.message)
       })

       passthrough.on("end", () => {
            clearTimeout(timeout)
            resolve(true)
       })

        
        // Download file
        axios({
            method: 'get',
            url: download_url,
            responseType: 'stream',
            maxRedirects: 1
        }).then(result => {
            
            // QUESTION #2: How do we buffer the file from here 
            // via axios.put to the resumable_url with the correct 
            // header information Content-Range and Content-Length?

            // CURIOSITY #1: Do we pipe from here 
            // to a passthrough stream that maintains a minimum buffer size?

            result.data.pipe(passthrough)
        }
        ).catch(error => {
            reject(error)
        })


    })
}

References

  1. Chunked Upload Class - (Decent chunking mechanism but bloated; seems there is a more efficient approach with stream piping)
  2. Google Drive API v3 - Upload via Resumable URL with Multiple Requests
  3. resumableUpload.js - (Conceptually right but uses File System)
  4. Google-Drive-Uploader - (Conceptually right but uses File System and custom StreamFactory)
  5. Resumable upload in Drive Rest API V3 - (Decent but seems bloated and antiquated)
pixelbobby
  • 4,368
  • 5
  • 29
  • 49
  • 1
    At first, I have to apologize for my poor English skill. Can I ask you about your question? 1. In your situation, can I consider that you have already known the file size of the file you want to download? 2. Have you already had the access token for uploading the data to Google Drive? 3. In your goal, is the downloaded data required to be uploaded by the multiple chunks? – Tanaike Jan 05 '21 at 01:54
  • @Tanaike, Yes, I have known about the file size. I give google the `Content-Length` and `Content-Type` via an access token, where Google gives back a "Resumable URL" that servers as the session to upload to. Yes, I added a visual example to the post. The idea is to have enough room for the chunk operation to handle errors and retry while the buffer is loaded, then if success, clear the buffer and continue until complete. – pixelbobby Jan 05 '21 at 02:00
  • 1
    Thank you for replying. From your replying and updating question, I proposed an answer. Could you please confirm it? If that was not the direction you expect, I apologize. – Tanaike Jan 06 '21 at 01:51
  • @Tanaike I am honored to receive such detail and insight from you in your approach from what you've provided here. Thank you for the energy. I am going to implement this and report back. あなたがここで提供したものからあなたのアプローチであなたからそのような詳細と洞察を受け取ることを光栄に思います。エネルギーをありがとう。これを実装して報告します – pixelbobby Jan 06 '21 at 04:08

1 Answers1

2

I believe your goal and current situation as follows.

  • You want to download a data and upload the downloaded data to Google Drive using Axios with Node.js.
  • For uploading the data, you want to upload using the resumable upload with the multiple chunks by retrieving the data from the stream.
  • Your access token can be used for uploading the data to Google Drive.
  • You have already known the data size and mimeType of the data you want to upload.

Modification points:

  • In this case, in order to achieve the resumable upload with the multiple chunks, I would like to propose the following flow.

    1. Download data from URL.
    2. Create the session for the resumable upload.
    3. Retrieve the downloaded data from the stream and convert it to the buffer.
      • For this, I used stream.Transform.
      • In this case, I stop the stream and upload the data to Google Drive. I couldn't think the method that this can be achieved without stopping the stream.
      • I thought that this section might be the answer for your question 2 and 3.
    4. When the buffer size is the same with the declared chunk size, upload the buffer to Google Drive.
      • I thought that this section might be the answer for your question 3.
    5. When the upload occurs an error, the same buffer is uploaded again. In this sample script, 3 retries are run. When 3 retries are done, an error occurs.
      • I thought that this section might be the answer for your question 1.

When above flow is reflected to your script, it becomes as follows.

Modified script:

Please set the variables in the function main().

const axios = require("axios");
const stream = require("stream");

function transfer(
  download_url,
  resumable_drive_url,
  file_type,
  file_length,
  accessToken,
  filename,
  chunkSize
) {
  return new Promise((resolve, reject) => {
    axios({
      method: "get",
      url: download_url,
      responseType: "stream",
      maxRedirects: 1,
    })
      .then((result) => {
        const streamTrans = new stream.Transform({
          transform: function (chunk, _, callback) {
            callback(null, chunk);
          },
        });

        // 1. Retrieve session for resumable upload.
        axios({
          method: "POST",
          url: resumable_drive_url,
          headers: {
            Authorization: `Bearer ${accessToken}`,
            "Content-Type": "application/json",
          },
          data: JSON.stringify({
            name: filename,
            mimeType: file_type,
          }),
        })
          .then(({ headers: { location } }) => {
            // 2. Upload the file.
            let startByte = 0;
            result.data.pipe(streamTrans);
            let bufs = [];
            streamTrans.on("data", async (chunk) => {
              bufs.push(chunk);
              const temp = Buffer.concat(bufs);
              if (temp.length >= chunkSize) {
                const dataChunk = temp.slice(0, chunkSize);
                const left = temp.slice(chunkSize);
                streamTrans.pause();
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress: from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",
                    url: location,
                    headers: {
                      "Content-Range": `bytes ${startByte}-${
                        startByte + dataChunk.length - 1
                      }/${file_length}`,
                    },
                    data: dataChunk,
                  })
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (err.response.status == 308) {
                        startByte += dataChunk.length;
                        streamTrans.resume();
                        return;
                      }
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      console.log("Retry");
                      upload();
                      return;
                    });
                };
                upload();
                bufs = [left];
              }
            });
            streamTrans.on("end", () => {
              const dataChunk = Buffer.concat(bufs);
              if (dataChunk.length > 0) {
                // 3. Upload last chunk.
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress(last): from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",
                    url: location,
                    headers: {
                      "Content-Range": `bytes ${startByte}-${
                        startByte + dataChunk.length - 1
                      }/${file_length}`,
                    },
                    data: dataChunk,
                  })
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      upload();
                      return;
                    });
                };
                upload();
              }
            });
            streamTrans.on("error", (err) => reject(err));
          })
          .catch((err) => reject(err));
      })
      .catch((error) => {
        reject(error);
      });
  });
}

function main() {
  const download_url = "###";
  const resumable_drive_url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";
  const file_type = "###"; // Please set the mimeType of the downloaded data.
  const file_length = 12345; // Please set the data size of the downloaded data.
  const accessToken = "###"; // Please set the access token.
  const filename = "sample filename"; // Please set the filename on Google Drive.
  const chunkSize = 10485760; // This is used as the chunk size for the resumable upload. This is 10 MB as a sample. In this case, please set the multiples of 256 KB (256 x 1024 bytes).

  transfer(
    download_url,
    resumable_drive_url,
    file_type,
    file_length,
    accessToken,
    filename,
    chunkSize
  )
    .then((res) => console.log(res))
    .catch((err) => console.log(err));
}

main();

Result:

When above script is run for the file size of 23558108 (which is a sample data), the following result is obtained in the console..

Progress: from 0 to 10485759 for 23558108
Progress: from 10485760 to 20971519 for 23558108
Progress(last): from 20971520 to 23558107 for 23558108
{
  kind: 'drive#file',
  id: '###',
  name: 'sample filename',
  mimeType: '###'
}

Note:

  • When you want to achieve the resumable upload using the single chunk, you can see the sample script at here.

References:

Tanaike
  • 181,128
  • 11
  • 97
  • 165