71

TL;DR: Please just go to the last part and tell me how you would solve this problem.

I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bit of concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.

For the sake of simplicity, assume here that this "external closed-source program" is zenity, a Linux command line tool that can display graphical message boxes from the command line.

Calling an executable file from Go

So, in Go, I would go like this:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

This should be working just right. Note that .Run() is a functional equivalent to .Start() followed by .Wait(). This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.

Calling an executable multiple times

Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i for the sake of simplicity).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

Ok, we did it! But I still can't see the advantage of Go over Python … This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.

Goroutines, or a way to make my program parallel

a) First attempt: just add "go"s everywhere

Let's rewrite our code to make things easier to call and reuse and add the famous go keyword:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.

To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching … I don't know how plain Go goroutines behave, but exec.Command will launch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!

b) Second attempt: use pools of goroutines

The online resources tend to recommend the use of sync.WaitGroup for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for all the 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.

Some other resources recommended the use of a buffered channel to do the work:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.

c) Third attempt: die when all the children are dead

Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup will be useful.

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Done.

My questions

  • Do you know any other proper way to limit the number of goroutines executed at once?

I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command creates a new thread each time it is called, so I should control the number of time it is called.

  • Does that code look fine to you?
  • Do you know how to avoid the use of a dummy channel in that case?

I can't convince myself that such dummy channels are the way to go.

Community
  • 1
  • 1

5 Answers5

89

I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroup to wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.

MaxVT
  • 12,989
  • 6
  • 36
  • 50
tux21b
  • 90,183
  • 16
  • 117
  • 101
  • 1
    I've just made a similar version. This is way better! Thanks. –  Aug 23 '13 at 15:45
  • 4
    No, it doesn't. The code above uses a single SPMC (single producer / multiple consumer) queue to distribute tasks to different workers. Each command can only be received once from the tasks channel. – tux21b Jun 23 '14 at 18:40
  • 2
    'range tasks' will iterate until the task channel has no more messages? – Chris Oct 31 '15 at 14:26
  • This solution has two little issues: (1) wg.Done() should better be wrapped in a defer wg.Done() . (2) even if you fix (1) , you still may NOT fully utilize max concurrency after some goroutines be killed by cmd.Run() panic somehow. – fubupc Sep 07 '16 at 12:16
  • +1 for the answer but, I have a question, what If I would like to store the i-output in a variable and print it? – AndreaM16 Oct 25 '16 at 18:45
34

A simple approach to throttling (execute f() N times but maximum maxConcurrency concurrently), just a scheme:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

Playground

I wouldn't probably call the throttle channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.

BTW: Please note that you're ignoring the returned error from cmd.Run().

zzzz
  • 87,403
  • 16
  • 175
  • 139
  • This example is wrong. You should fill the `throttle` channel with `maxConcurrency` items first and swap the receive and the send operation in order to synchronize data properly. Take a look at this thread for more information: https://groups.google.com/d/msg/golang-nuts/MDvnk1Ax7UQ/eQGkJJmOxc4J – tux21b Aug 23 '13 at 18:29
  • 3
    @tux21b: You're welcome to formally prove it being wrong ;-) Meanwhile,let me please attempt to prove the opposite: _Before_ starting a new goroutine a "token" must be inserted (`throttle <- 1`) in the `throttle channel`. A "token" is removed (`<-throttle`) from the same channel only _after_ `f()` completes its whatever-processing. As the channel has a fixed capacity of `maxConcurrency`, there can never be more than `maxConcurrency` tokens queued. Thus there cannot ever be more than `maxConcurrency` concurrent instances of `f()` processing data. BTW: No "synchronize data" happens above. – zzzz Aug 23 '13 at 19:45
  • It depends on what you are doing in the loop. If you just call `println` which is projected by a mutex anyway, everything will be fine. But if you modify something (lets say a shared variable x with `maxConcurrency` set to one) you will get a data race (and the go tool will happily report it). When you remove a token (`<-throttle`) in goroutine A and insert a token in goroutine B (`token <- 1`), B might not see the changes from A. Better be safe and do it the other way round by swapping the send / receive operations. – tux21b Aug 23 '13 at 20:18
  • 1
    @tux21b: No. In the schema presented, no data races are ever relevant because no data are concurrently shared. The schema is just a mechanism for limiting the number of concurrently executing functions (aka workers if you prefer). The schema does exactly that (and nothing more). Even when claimed "wrong" w/o any proof so far... – zzzz Aug 23 '13 at 20:33
  • It's enough to prove something wrong by giving an example (just imagine tasks like "insert this to the db", "read that from the db" and a limit of one). There are probably more complicated examples that will not work but I'm not an expert either. In order order to prove something correct, you would have to argue with just the rules of the Go memory model that uses acquire / release semantics. Giving a correct sequence and assuming sequential consistency doesn't do it. Your solution is probably fine for some cases (i.e. your `println` example), but you need to be very cautious. – tux21b Aug 23 '13 at 20:52
  • 1
    @tux21b: I'm sorry, I've formally derived why my example works, but you've so far presented _no proof whatsoever_ why it doesn't. Hand waving by _"just imagine"_ doesn't qualify. Have you really understood my derivation? If so, can you make a valid point about where it fails? My claim is: _"The example limits the number of concurrently executing `f()` to `maxConcurrency`"_. Your claim is _"This example is wrong"_. We cannot both be right, correct? ;-) – zzzz Aug 23 '13 at 20:59
  • Your algorithm doesn't ensure that the finishing of a task happens before the start of a later task. Therefore I claim that there might be more than `maxConcurrency` tasks executing at a given time. Some goroutines might still need to store the computed result in the system memory (by flushing their store buffers, etc.). That's also the reason why you could get a data race with `maxConcurrecny = 1`. My suggested change would ensure the required happens-before relationship by swapping the send and the receive. It has the same amount of concurrency primitives and isn't more complicated. – tux21b Aug 23 '13 at 23:41
  • @tux21b: I'm sorry, you're completely missing the previously presented proof. Here we go again: There is a fixed upper limit of tokens in a channel. No task starts before putting a token into the channel. No token is removed from the channel before a task finishes. Therefore there is absolutely no way how there can ever be more tasks in progress than is the capacity of the channel. q.e.d. Additionally, any data races are absolutely tangential and irrelevant to the code I presented as there are _no_ data processed in it. I'm afraid you don't understand how the simple code actually works, sorry. – zzzz Aug 24 '13 at 19:55
  • Beautiful solution! – Johann Hagerer Dec 21 '16 at 23:31
  • 1
    For anyone else looking at this, TL;DR zzzz is correct, although for a time tux21b was correct. See the following: https://golang.org/doc/go1.3#memory https://codereview.appspot.com/75130045 https://golang.org/doc/effective_go.html#channels – William King May 05 '17 at 19:54
  • Exactly what I was looking for! My long running goroutines were sucking up memory until finaly OSX decided to kill the whole process. With this I now just limit goroutines to the number of cores. – perelin Jul 16 '17 at 21:11
2

Modules


Template

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "math/rand"
    "runtime"
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
    
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

Optimal routine quantity

  • If the operation is CPU bounded: runtime.NumCPU()
  • Otherwise test with: time go run *.go

Configure

export GOPATH="$(pwd)/gopath"
go mod init *.go
go mod tidy

CleanUp

find "${GOPATH}" -exec chmod +w {} \;
rm --recursive --force "${GOPATH}"

1

try this: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()
korovkin
  • 21
  • 3
1

You could use Worker Pool pattern described here in this post. This is how an implementation would look like ...

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    pool := 4
    intChan := make(chan int)


    for i:=0; i<pool; i++ {
        go callProg(intChan)  // <--- launch the worker routines
    }

    for i:=0;i<NumEl;i++{
        intChan <- i        // <--- push data which will be received by workers
    }

    close(intChan) // <--- will safely close the channel & terminate worker routines
}

func callProg(intChan chan int) {
    for i := range intChan{
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
mohit jain
  • 77
  • 1
  • 4