2

I have the following function:

func myrun(entries []WhatEverType) {
    for i := range entries {
        dotreatment(entries[i])
    } 
}

I want to make parallel calls to dotreatment, I tried the following:

    func myrunMT(entries []WhatEverType) {
        var wg sync.WaitGroup
        stopped := false
        threads := 5 //number of threads could be argument
        com := make(chan WhatEverType, 100) //size of chan could be argument
        wg.Add(threads)
        for i := 0; i < threads; i++ {
            go func() {
                for !stopped || len(com) {
                    select {
                        case entry := <-com:
                            dotreatment(entry) //lock if necessary
                        case time.After(100*time.Millisecond):
                    }
                }
                wg.Done()
            }()
        }
        for _, entry := range entries {
            com <- entry
        }
        stopped = true
        wg.Wait()
    }

Is there any better way to do it? Especially I would like to avoid sending all the entries through a chan and only use a shared index between the go routines.

Séb
  • 49
  • 2
  • If you have a general need to do batches of concurrent work and control throttling the number of concurrent "threads", you could use (or copy) the `code.cloudfoundry.org/workpool` package like so: https://play.golang.org/p/XKbT67vP6i (note this won't actually run in the playground because it imports an external package). – Amit Kumar Gupta Jan 23 '17 at 10:15

3 Answers3

1

First, your solution has data race. You're reading and modifying the stopped variable from multiple goroutines.

An easy solution could be to divide the index range of the passed slice, and have multiple goroutines process the different index ranges. This is how it could look like:

func process(ms []My) {
    workers := 5
    count := len(ms) / workers
    if count*workers < len(ms) {
        count++
    }

    wg := &sync.WaitGroup{}
    for idx := 0; idx < len(ms); {
        wg.Add(1)
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ms2 := ms[idx:idx2]
        idx = idx2
        go func() {
            defer wg.Done()
            for i := range ms2 {
                handle(&ms2[i])
            }
        }()
    }
    wg.Wait()
}

func handle(m *My) {}

For the number of worker goroutines you could use runtime.GOMAXPROCS(), as if processing the entries does not involve IO operations (or waiting for something outside of the goroutine), no need to have the Go runtime manage more goroutines than those that can run actively:

workers := runtime.GOMAXPROCS(0)

Note that although this solution does not involve sending entries through a channel, if one (some) goroutine finishes earlier, CPU utilization might drop at the end (when less goroutines have work to do).

The advantage of the producer-consumer model is that all worker goroutines will work equally till the end. But yes, the communication overhead might not be negligible. Whether one is better than the other depends on the amount of work that needs to be done on each entry.

An improved version could mix the 2: you could send smaller slices, smaller index ranges over a channel, e.g. batches of 100 entries. This could reduce the idle time compared to the first solution, and could also reduce the communication overhead as entries are sent over the channel individually, so the values sent is only one hundredth of the total number.

This is an example implementation of this improved, mixed version:

func process(ms []My) {
    workers := runtime.GOMAXPROCS(0)
    // 100 jobs per worker average:
    count := len(ms) / workers / 100
    if count < 1 {
        count = 1
    }

    ch := make(chan []My, workers*2) // Buffer size scales with # of workers

    wg := &sync.WaitGroup{}

    // Start workers
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for ms2 := range ch {
                for j := range ms2 {
                    handle(&ms2[j])
                }
            }
        }()
    }

    // Send jobs:
    for idx := 0; idx < len(ms); {
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ch <- ms[idx:idx2]
        idx = idx2
    }

    // Jobs sent, close channel:
    close(ch)

    // Wait workers to finish processing all jobs:
    wg.Wait()
}

Note that there is no stopping variable to signal completion. Instead we used a for range on a channel in each goroutine, as this ranges over the channel until the channel is closed, and it's safe for concurrent use. Once the channel is closed and the goroutines processed all jobs sent on the channel, they terminate, and so does the overall processing algorithm (and not earlier – meaning all jobs will be processed).

icza
  • 389,944
  • 63
  • 907
  • 827
  • I like this one, good idea to split the data between the workers to make them independent but you are still copying the data. An improvement is to only send the two indexes (start and stop) in the channel and then you get access to data in read only (thread-safe). An other improvement is to dedicate slice between the workers at the begining. No channel anymore, but you could have a goroutine which has no more work when another as still a lot of work to do, but should be ok on average. Last point, not sure race condition could occurs on a boolean in such an use case (in my first run). – Séb Jan 23 '17 at 13:46
  • @Séb First, data is not copied. `[]My` is a _slice_ not an array. If slice vs array is not clear, check out blog post [Go Slices: usage and internals](https://blog.golang.org/go-slices-usage-and-internals). 2nd, a slice is little bigger than just sending the indices (see [`reflect.SliceHeader`](https://golang.org/pkg/reflect/#SliceHeader), it's just a pointer and 2 `int` numbers). 3rd, data race does occur. Don't guess, run it with the `-race` option to see. – icza Jan 23 '17 at 13:51
  • @Séb And about "unharmful" data race, read [Is it safe to read a function pointer concurrently without a lock?](http://stackoverflow.com/questions/41406501/is-it-safe-to-read-a-function-pointer-concurrently-without-a-lock/41407827#41407827) – icza Jan 23 '17 at 14:02
  • Ok, I read your link, and I'm convinced about data race on boolean. More important (and I run some test), there is no really overhead to use sync/atomic. Ok for the information about slice, I wasn't aware about it, really intersting. So your solution is really good. At last, I didn't know about ranging a channel. So Three important facts learn, thanks. – Séb Jan 23 '17 at 18:48
  • I made more tests, this solutions is actually the most efficient, even for a small treatment (counting a kind of char in a short sentence), this is the fastest. – Séb Jan 24 '17 at 10:13
0

I would not mix channels and synchronization primitives. Use of channels exclusively is idiomatic Go. Bear in mind that Go routines are not threads, there are much lighter with low overhead. Launching one million of them is not a big deal.
If the order of the result does not matter, I would do something like this:

func parallelRun(input []WhateverInputType) []WhateverOutputType {
    out := make(chan WhateverOutputType, len(input))
    for _, item := range input {
        go func(i WhateverInputType) {
            out <- process(i)
        }(item)
    }

    res := make([]WhateverOutputType, len(input))
    for i := 0; i < len(input); i++ {
        res[i] = <-out
    }

    return res
}

func process(input WhateverInputType) WhateverOutputType {
    time.Sleep(50 * time.Millisecond)
    return WhateverOutputType{}
}

Assuming ‘process’ takes much longer than collecting the result, I would even use a blocking channel out := make(chan WhateverOutputType)
Please note that passing arrays as parameters is not ideal (there are copied) but I tried to keep the spirit of your original code.

Franck Jeannin
  • 6,107
  • 3
  • 21
  • 33
  • You are creating as many goroutines as the number of elements in the input list, I think you are going to hurt a bottleneck in performance this way in my experience. – Séb Jan 23 '17 at 13:30
  • It depends on how big the input is, how long/predicable ‘process’ is, how many cores you have, etc. You know the 3 rules of performance optimization: measure, measure, measure. – Franck Jeannin Jan 23 '17 at 15:52
  • You're right, but it's really measurements which tell me to reuse threads instead of creating, I already test that into a problem a few weeks ago. – Séb Jan 23 '17 at 17:58
0

After searching I get the following with no copy of data using a shared index:

func myrunMT(entries []WhatEverType) int {
    lastone := int32(len(entries)-1)
    current := int32(0)
    var wg sync.WaitGroup
    threads := 5
    //start threads
    wg.Add(threads)
    for i := 0; i < threads; i++ {
        go func() {
            for {
                idx := atomic.AddInt32(&current, 1)-1
                if Loadint32(&current) > Loadint32(&lastone) {
                    break
                } 
                dotreatment(entries[idx])
            }
            wg.Done()
        }()
    }
    wg.Wait()
}
Séb
  • 49
  • 2
  • Ok after all the readings, ok only if entries is not modified during the run, otherwise a data race could occur – Séb Jan 23 '17 at 18:49