1

I need to elaborate a file (potentially a big file) one block at a time and write the result to a new file. To put it simply, I have the basic function to elaborate a block:

func elaborateBlock(block []byte) []byte { ... }

Every block needs to be elaborated and then written to the output file sequentially (preserving original order).

The one-thread implementation is trivial:

for {
        buffer := make([]byte, BlockSize)
        _, err := inputFile.Read(buffer)

        if err == io.EOF {
            break
        }
        processedData := elaborateBlock(buffer)
        outputFile.Write(processedData)
}

But the elaboration can be heavy and every block can be processed separately, so a multi-threaded implementation is the natural evolution.

The solution I came up with is to create an array of channels, compute every block in a different thread and sync the final write by looping the channel array:

Utility function:

func blockThread(channel chan []byte, block []byte) {
    channel <- elaborateBlock(block)
}

In the main program:

chans = []chan []byte {}

for {
    buffer := make([]byte, BlockSize)
    _, err := inputFile.Read(buffer)

    if err == io.EOF {
        break
    }

    channel := make(chan []byte)
    chans = append(chans, channel)

    go blockThread(channel, buffer)
}

for i := range chans {
    data := <- chans[i]
    outputFile.Write(data)
}

This approach works but can be problematic with large files because it requires to load the whole file in memory before starting writing the output.

Do you think there can be a better solution, with also better performance overall?

gbalduzzi
  • 9,356
  • 28
  • 58

2 Answers2

1

If blocks do need to be written out in order

If you want to work on multiple blocks concurrently, obviously you need to hold multiple blocks in memory at the same time.

You may decide how many blocks you want to process concurrently, and it's enough to read as many into memory at the same time. E.g. you may say you want to process 5 blocks concurrently. This will limit memory usage, and still utilize your CPU resources potentially to the max. Recommended to pick a number based on your available CPU cores (if processing a block does not already use multi cores). This can be queried using runtime.GOMAXPROCS(0).

You should have a single goroutine that reads the input file sequentially, and prodocue the blocks wrapped in Jobs (which also contain the block index).

You should have multiple worker goroutines, preferable as many as cores you have (but experiment with smaller and higher values too). Each worker goroutine just receives jobs, and calls elaborateBlock() on the data, and delivers it on the results channel.

There should be a single, designated consumer which receives completed jobs, and writes them in order to the output file. Since goroutines run concurrently and we have no control in which order the blocks are completed, the consumer should keep track of the index of the next block to be written to the output. Blocks arriving out of order should only be stored, and only proceed with writing if the subsequent block arrives.

This is an (incomplete) example how to do all these:

const BlockSize = 1 << 20 // 1 MB

func elaborateBlock(in []byte) []byte { return in }

type Job struct {
    Index int
    Block []byte
}

func producer(jobsCh chan<- *Job) {
    // Init input file:
    var inputFile *os.File

    for index := 0; ; index++ {
        job := &Job{
            Index: index,
            Block: make([]byte, BlockSize),
        }

        _, err := inputFile.Read(job.Block)
        if err != nil {
            break
        }

        jobsCh <- job
    }
}

func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
    for job := range jobsCh {
        job.Block = elaborateBlock(job.Block)
        resultCh <- job
    }
}

func consumer(resultCh <-chan *Job) {
    // Init output file:
    var outputFile *os.File

    nextIdx := 0
    jobMap := map[int]*Job{}

    for job := range resultCh {
        jobMap[job.Index] = job

        // Write out all blocks we have in contiguous index range:
        for {
            j := jobMap[nextIdx]
            if j == nil {
                break
            }
            if _, err := outputFile.Write(j.Block); err != nil {
                // handle error, maybe terminate?
            }
            delete(nextIdx) // This job is written out
            nextIdx++
        }
    }
}

func main() {
    jobsCh := make(chan *Job)
    resultCh := make(chan *Job)

    for i := 0; i < 5; i++ {
        go worker(jobsCh, resultCh)
    }

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer(resultCh)
    }()

    // Start producing jobs:
    producer(jobsCh)
    // No more jobs:
    close(jobsCh)

    // Wait for consumer to complete:
    wg.Wait()
}

One thing to note here: this alone won't guarantee limiting the used memory. Imagine a case where the first block would require an enormous time to calculate, while subsequent blocks do not. What would happen? The first block would occupy a worker, and the other workers would "quickly" complete the subsequent blocks. The consumer would store all in memory, waiting for the first block to complete (as that has to be written out first). This could increase memory usage.

How could we avoid this?

By introducing a job pool. New jobs could not be created arbitrarily, but taken from a pool. If the pool is empty, the producer has to wait. So when the producer needs a new Job, takes one from a pool. When the consumer has written out a Job, puts it back into the pool. Simple as that. This would also reduce pressure on the garbage collector, as jobs (and large []byte buffers) are not created and thrown away, they could be re-used.

For a simple Job pool implementation you could use a buffered channel. For details, see How to implement Memory Pooling in Golang.

If blocks can be written in any order

Another option could be to allocate the output file in advance. If the size of the output blocks are also deterministic, you can do so (e.g. outsize := (insize / blocksize) * outblockSize).

To what end?

If you have the output file pre-allocated, the consumer does not need to wait input blocks in order. Once an input block is calculated, you can calculate the position where it will go in the output, seek to that position and just write it. For this you may use File.Seek().

This solution still requires to send the block index from the producer to the consumer, but the consumer won't need to store blocks arriving out-of-order, so the consumer can be simpler, and does not need to store completed blocks until the subsequent one arrives in order to proceed with writing the output file.

Note that this solution naturally does not pose a memory threat, as completed jobs are never accumulated / cached, they are written out in the order of completion.


See related questions for more details and techniques:

Is this an idiomatic worker thread pool in Go?

How to collect values from N goroutines executed in a specific order?

icza
  • 389,944
  • 63
  • 907
  • 827
0

here is a working example that should work and is as close as possible to your original code.

the idea is to turn your array into a channel of channels of bytes. then

  • first fire up a consumer that will read on this channel of channels , get the channel of bytes, read from it and write the result.

  • Back on the main thread you create a channel of bytes, write it to the channel of channels (now the consumer reading sequentially from them will read the results in order) and then fire up the process that will do the work and write on the allocated channel (producers).

what will happen now is that the there will be a "race" between the procuders and the consumer, as soon as a produced block is read from the consumer and written the resources associated with it will be deallocated. this could be an improvement to your original design.

here is the code and the playground link:

package main

import (
    "bytes"
    "fmt"
    "io"
    "sync"
)

func elaborateBlock(b []byte) []byte {
    return []byte("werkwerkwerk")
}

func blockThread(channel chan []byte, block []byte, wg *sync.WaitGroup) {
    channel <- elaborateBlock(block)
    wg.Done()
}

func main() {
    chans := make(chan chan []byte)
    BlockSize := 3
    inputBytes := bytes.NewBuffer([]byte("transmutemetowerkwerkwerk"))

    producewg := sync.WaitGroup{}
    consumewg := sync.WaitGroup{}
    consumewg.Add(1)
    go func() {
        chancount := 0
        for ch := range chans {
            data := <-ch
            fmt.Printf("got %d block, result:%s\n", chancount, data)
            chancount++
        }
        fmt.Printf("done receiving\n")
        consumewg.Done()
    }()
    for {
        buffer := make([]byte, BlockSize)
        _, err := inputBytes.Read(buffer)

        if err == io.EOF {
            go func() {
                //wait for all the procuders to finish
                producewg.Wait()
                //then close the main channel to notify the consumer
                close(chans)
            }()
            break
        }

        channel := make(chan []byte)
        chans <- channel //give the channel that we return the result to the receiver

        producewg.Add(1)
        go blockThread(channel, buffer, &producewg)
    }

    consumewg.Wait()
    fmt.Printf("main exiting")
}

playground link

as a minor point i don't feel right about the "read the whole file into memory" statement cause you are just reading a block every time from the Reader, maybe "holding the result of the whole computation in memory" is more appropriate?

ramrunner
  • 1,362
  • 10
  • 20