First, your solution has data race. You're reading and modifying the stopped
variable from multiple goroutines.
An easy solution could be to divide the index range of the passed slice, and have multiple goroutines process the different index ranges. This is how it could look like:
func process(ms []My) {
workers := 5
count := len(ms) / workers
if count*workers < len(ms) {
count++
}
wg := &sync.WaitGroup{}
for idx := 0; idx < len(ms); {
wg.Add(1)
idx2 := idx + count
if idx2 > len(ms) {
idx2 = len(ms)
}
ms2 := ms[idx:idx2]
idx = idx2
go func() {
defer wg.Done()
for i := range ms2 {
handle(&ms2[i])
}
}()
}
wg.Wait()
}
func handle(m *My) {}
For the number of worker goroutines you could use runtime.GOMAXPROCS()
, as if processing the entries does not involve IO operations (or waiting for something outside of the goroutine), no need to have the Go runtime manage more goroutines than those that can run actively:
workers := runtime.GOMAXPROCS(0)
Note that although this solution does not involve sending entries through a channel, if one (some) goroutine finishes earlier, CPU utilization might drop at the end (when less goroutines have work to do).
The advantage of the producer-consumer model is that all worker goroutines will work equally till the end. But yes, the communication overhead might not be negligible. Whether one is better than the other depends on the amount of work that needs to be done on each entry.
An improved version could mix the 2: you could send smaller slices, smaller index ranges over a channel, e.g. batches of 100 entries. This could reduce the idle time compared to the first solution, and could also reduce the communication overhead as entries are sent over the channel individually, so the values sent is only one hundredth of the total number.
This is an example implementation of this improved, mixed version:
func process(ms []My) {
workers := runtime.GOMAXPROCS(0)
// 100 jobs per worker average:
count := len(ms) / workers / 100
if count < 1 {
count = 1
}
ch := make(chan []My, workers*2) // Buffer size scales with # of workers
wg := &sync.WaitGroup{}
// Start workers
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for ms2 := range ch {
for j := range ms2 {
handle(&ms2[j])
}
}
}()
}
// Send jobs:
for idx := 0; idx < len(ms); {
idx2 := idx + count
if idx2 > len(ms) {
idx2 = len(ms)
}
ch <- ms[idx:idx2]
idx = idx2
}
// Jobs sent, close channel:
close(ch)
// Wait workers to finish processing all jobs:
wg.Wait()
}
Note that there is no stopping
variable to signal completion. Instead we used a for range
on a channel in each goroutine, as this ranges over the channel until the channel is closed, and it's safe for concurrent use. Once the channel is closed and the goroutines processed all jobs sent on the channel, they terminate, and so does the overall processing algorithm (and not earlier – meaning all jobs will be processed).