0

Using https://github.com/klauspost/reedsolomon, erasure coding a large file into smaller chunks, I want to use io.Pipe() to create a stream of pipes which connects each other. For example: one pipe which helps in Chunking the file and other which calls func Upload for uploading the file

func (rd *ReedSolomon) Chunking() {
    fname := rd.File

    // Create encoding matrix.
    enc, err := reedsolomon.NewStream(rd.data, rd.Par)
    checkErr(err)

    fmt.Println("Opening", fname)
    f, err := os.Open(fname)
    checkErr(err)

    instat, err := f.Stat()
    checkErr(err)

    shards := rd.Data + rd.Par
    out := make([]*os.File, shards)

    // Create the resulting files.
    dir, file := filepath.Split(fname)
    if fi.OutDir != "" {
        dir = fi.OutDir
    }

    for i := range out {
        outfn := fmt.Sprintf("%s.%d", file, i)
        out[i], err = os.Create(filepath.Join(dir, outfn))
        checkErr(err)
    }

    // Split into files.
    data := make([]io.Writer, rd.Data)
    for i := range data {
        data[i] = out[i]
    }

    // Do the split
    err = enc.Split(f, data, instat.Size())
    checkErr(err)

    // Close and re-open the files.
    input := make([]io.Reader, rd.Data)
    target_url := "http://localhost:8080"

    for i := range data {
        out[i].Close()
        f, err := os.Open(out[i].Name())
        checkErr(err)
        input[i] = f
        defer f.Close()
    }

    // Create parity output writers
    parity := make([]io.Writer, rd.Par)
    for i := range parity {
        parity[i] = out[rd.Data+i]
        defer out[rd.Data+i].Close()
    }

    err = enc.Encode(input, parity)
    checkErr(err)

    for _, sd := range out {
        postFile(sd.Name(), target_url)
    }

}

I am pretty new to the use of the io.Pipe()

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
N_b26
  • 43
  • 7
  • Note that `enc.Split` takes a reader as a source and a set of writer as destinations, they are both streamable, so they can deal with large files easily. –  Aug 30 '18 at 18:56
  • One way i suspect you could use io.Pipe in this code would be to encode and split simultaneously. That would avoid an IO round trip to create the parity shards. –  Aug 30 '18 at 19:06
  • @mh-cbon, I am not able to encode and split simultaneously, I might be doing wrong, If you could provide an example for the same. – N_b26 Aug 30 '18 at 19:20
  • I have been given it a try, but it fails with a deadlock. I m not sure why the library does not allow this, sorry. See [this](https://play.golang.org/p/RXJ6JrxmWrB). –  Aug 30 '18 at 19:53
  • @mh-cbon, Thank you for explaining the pipe logic, I also want to know your thoughts. do you think using Pipe here makes sense since it ends up in deadlock. – N_b26 Aug 30 '18 at 21:44
  • nop. I d rather use a multiwriter to split the file. The multiwriter would send the file to the remote EP and writes files locally to a temp directory. With the local files it would be possible to generate the parity shards without having a network round trip to read the input. I would also writes the parity shards directly to the remote end point instead of having temporary files. Finally i would take care to check for all errors, implement retry strategies and add some logging. –  Aug 31 '18 at 10:50
  • please ask a new question describing the issue and containing the code you are blocking with. –  Sep 04 '18 at 17:19

0 Answers0