1

Current code structure:

func doStuff(serializeds []string) ([]*MyStruct, error) {
    objs := []*MyStruct{}
    for _, s := range serializeds {

        deserializedObject, ok, err := doDeserialization(s)
        if err != nil {
           // if any err, abandon the whole thing
           return nil, err
        }

        if !ok {
          continue
        }
        objs = append(objs, deserializedObject)
    }
    return objs, nil
}

serializeds typically ranges from 200-1000 serialized strings at a time. And it takes approx 0.5-1ms to doDeserialization on each of them.

Goals:

  1. Deserialization concurrently and do it as fast as possible.
  2. I need to preserve the ordering of the original slice
  3. Implement this using channels for that it is easier to read

Side question: is it ok to spin up one goroutine per serialized string or is it more efficient to only have a limited number of goroutines (e.g. 50 goroutines)

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
samol
  • 18,950
  • 32
  • 88
  • 127
  • 1
    As for how many goroutines, if you're requirement is performance you need to profile and see. – JimB Jul 06 '16 at 17:26
  • Goroutines are relatively cheap. See http://stackoverflow.com/questions/8509152/max-number-of-goroutines. – Thomas Jul 06 '16 at 18:13

2 Answers2

1

You can create the output slice beforehand with the required size (you know the length of serializeds beforehand) and then fill this slice with the mapped values from within the Go routines and their indices from the original slice:

wait := new(sync.WaitGroup)
objs := make([]YourStructType, len(serializeds))

for i, s := range serializeds {
    wait.Add(1)
    go func(j int) {
        defer wait.Done()
        deserializedObject, err := doDeserialization(s)
        if err != nil {
            // add error handling here
        }
        objs[j] = deserializedObject
    }(j)
}

wait.Wait()

Regarding your side question: This warrants a thorough profiling of your application with both implementations. Intuitively, I'd guess that Go's goroutine scheduler should be efficient enough to handle this without much overhead, and that you should probably not bother with the additional complexity of a goroutine worker pool. However, without profiling, that's guesswork at best.

helmbert
  • 35,797
  • 13
  • 82
  • 95
  • I had a slight adjustment to the question that it is possible to skip items – samol Jul 06 '16 at 17:44
  • Also can you adjust your solution with error handling. How do I return error out of the outer function inside the goroutine function? – samol Jul 06 '16 at 17:48
0

Your problem requires a concurrent processing and then a sequential read. @helmbert's solution is simple and elegant enough to solve your problem with a bit more tweak.

However, you can also perform a controlled concurrent execution and then a sequential read using the below procedure. It is not very clean but I have used something like this before and it works fine. (Please note that the code below almost certainly has bugs, so be careful).

objs := []*MyStruct{}

// concurrency factor. This controlls the number of
// goroutines you'll be running. (note that number of goroutines
// can be cf+1 also if len(serializeds) % cf != 0.
cf := 3

// length of sub slice each goroutine will be procesing
subSliceLen := len(serializeds) / cf // cf must be > len

// make a channel to get error from goroutines
errorCh := make(chan error)

// chans will store the created channels in proper order
chans := make([]chan *MyStruct)

// quit channel to signal currently executing goroutines to stop
quit := make([]chan struct{})

// loop to only read a part of the original input slice
for i := 0; i <= len(serializeds); i += subSliceLen {
    // setup slice sub section to be processed. may be bugged!!
    hi := subSliceLen
    if i + hi >= len(serializeds) {
        hi = len(serializeds)
    }

    // create a channel for the goroutine that will handle
    // input slice values from i to hi. It is important to make
    // these channels buffered otherwise there will be no possibility
    // of parallel execution.
    ch := make(chan *MyStruct{}, subSliceLen)
    chans = append(chans, ch)

    go func(ch chan *MyStruct, i, hi int) {
        defer close(ch)
        for _, s := range serialzed[i:hi] {
            deserializedObject, ok, err := doDeserialization(s)
            if err != nil {
                // if any err, abandon the whole thing
                errorCh <- err
                // signal other goroutines that they should
                // stop the work and return
                close(quit)
                return
            }
            if !ok {
                continue
            }
            select {
            // this is required in order to receive the signal that
            // some other goroutine has encountered an error and that
            // this goroutine should also cleanly return. without this
            // there will be a goroutine leak.
            case <- quit:
                return
            case ch <- deserializedObject:
                // do nothing
            }
        }
    }(ch, i, hi)
}

Now chans has all the channels that are now receiving the processed data. We can now start reading sequentially from them.

i := 0
for {
    select {
    case v, ok := <-chans[i]:
        if !ok {
             // chans[i] is closed and has been read completely
            if i >= len(chans) {
                break
            }
            // proceed to the next channel
            i++
        }
        objs = append(objs, v)
    case <- err:
        // handle error, close all
    }
}
abhink
  • 8,740
  • 1
  • 36
  • 48