3

I write a service that download images, join it via zip archive and upload it back to aws. This service should be time-efficient. My first version was dead simple:

  1. Download all files in parallel and save to the disk.
  2. Read all files from the disk, join it by zip package and save back to the disk
  3. Read archive from the disk and send it to the s3.

But I think all those save and read from disk operations is less performant than in memory communication.

I joined downloading and archiving together (all download readers going directly to the archiver). But I can't understand how to join it with the uploader.

S3 uploader need a ReadSeeker to put object. Current implementation of archiver is:

func Archive(inputQueue <-chan Input) io.ReadSeeker {
    zipFile, err := os.Create("test_arch.zip")
    if log.Error(err) {
        os.Exit(1)
    }

    arch := zip.NewWriter(zipFile)
    go func() {
        defer arch.Close()
        for input := range inputQueue {
            header := &zip.FileHeader{
                Name:   filepath.Join(baseDir, input.Path()),
                Method: zip.Store,
            }
            writer, err := arch.CreateHeader(header)
            if log.Error(err){
                os.Exit(1)
            }
            _, err = io.Copy(writer, input.Reader())
        }

    }()

    return zipFile
}

It save archive to the disk. How to write archihve to intermediate structure to pass this structure to s3 uploader that require a ReadSeeker?

abonec
  • 1,379
  • 1
  • 14
  • 23
  • 2
    See this answer how to obtain an in-memory `io.ReadSeeker`: [How to serve http partial content with Go?](https://stackoverflow.com/questions/36540610/how-to-serve-http-partial-content-with-go/36543480#36543480) – icza Aug 18 '18 at 12:33

2 Answers2

6

If possible, you should use s3manager.Uploader.Upload from the "github.com/aws/aws-sdk-go/service/s3/s3manager" package, which accepts an io.Reader for input and handles all the multipart upload logic for you (that's what the io.Seeker interface is needed for).

If that's not an option, you can use a bytes.Buffer as an io.Writer instead of the file and then a bytes.Reader as an io.ReadSeeker.

E.g.

func Archive(inputQueue <-chan Input) *bytes.Buffer {
    buf := bytes.NewBuffer(nil)

    arch := zip.NewWriter(buf)
    go func() {
        defer arch.Close()
        for input := range inputQueue {
            header := &zip.FileHeader{
                Name:   filepath.Join(baseDir, input.Path()),
                Method: zip.Store,
            }
            writer, err := arch.CreateHeader(header)
            if log.Error(err) {
                os.Exit(1)
            }
            _, err = io.Copy(writer, input.Reader())
        }

    }()

    return buf
}

Then wrap the buffer bytes in a bytes.Reader:

readSeeker := bytes.NewReader(buf.Bytes())
Matt D.
  • 896
  • 6
  • 5
  • Before first read there is not data in the buf (because it first passed to the uploader and only after that starts to filling by bytes in goroutine). Because of that uploader gets io.EOF and exit from upload data. – abonec Aug 20 '18 at 10:19
1

Since s3manager can upload regular io.Reader, instead of using file I used a Pipe as following code:

const baseDir = "export"

func Archive(inputQueue <-chan Input) io.Reader {
    pr, pw := io.Pipe()

    arch := zip.NewWriter(pw)
    go func() {
        defer pw.Close()
        defer arch.Close()
        for input := range inputQueue {
            header := &zip.FileHeader{
                Name:   filepath.Join(baseDir, input.Path()),
                Method: zip.Store,
            }
            writer, err := arch.CreateHeader(header)
            if log.Error(err) {
                os.Exit(1)
            }
            _, err = io.Copy(writer, input.Reader())
        }
        fmt.Println()
    }()

    return pr
}

Read half of the pipe going to the next level (uploading part of the service). Write half of the pipe going to the current level (archiving part of the service). By the nature of the pipe it can handle huge size of archives by using small amount of memory. And this solution is threadsafe.

abonec
  • 1,379
  • 1
  • 14
  • 23