78

I see lots of tutorials and examples on how to make Go wait for x number of goroutines to finish, but what I'm trying to do is have ensure there are always x number running, so a new goroutine is launched as soon as one ends.

Specifically I have a few hundred thousand 'things to do' which is processing some stuff that is coming out of MySQL. So it works like this:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
    err := rows.Scan(&id)
    checkErr(err)
    go processTheThing(id)
    }
checkErr(err)
rows.Close()

Currently that will launch several hundred thousand threads of processTheThing(). What I need is that a maximum of x number (we'll call it 20) goroutines are launched. So it starts by launching 20 for the first 20 rows, and from then on it will launch a new goroutine for the next id the moment that one of the current goroutines has finished. So at any point in time there are always 20 running.

I'm sure this is quite simple/standard, but I can't seem to find a good explanation on any of the tutorials or examples or how this is done.

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
Alasdair
  • 13,348
  • 18
  • 82
  • 138
  • 3
    General advice: Watch Go Concurrency Patterns by Robert Pike and Advanced Go Concurrency Patterns by Sameer Ajmani on Youtube. They offer a great foundation how to use Go's concurrency model. – 0x434D53 Aug 15 '14 at 10:47

8 Answers8

109

You may find Go Concurrency Patterns article interesting, especially Bounded parallelism section, it explains the exact pattern you need.

You can use channel of empty structs as a limiting guard to control number of concurrent worker goroutines:

package main

import "fmt"

func main() {
    maxGoroutines := 10
    guard := make(chan struct{}, maxGoroutines)

    for i := 0; i < 30; i++ {
        guard <- struct{}{} // would block if guard channel is already filled
        go func(n int) {
            worker(n)
            <-guard
        }(i)
    }
}

func worker(i int) { fmt.Println("doing work on", i) }
artyom
  • 2,390
  • 2
  • 16
  • 18
  • 10
    This approach does not work in case you have less items to process than "maxGoroutines". Also there is no waiting for finishing workers. – Oleg Neumyvakin Feb 01 '18 at 07:54
  • It worked for me with fewer than maxGoroutines items, as long as I added a short sleep to wait for finishing workers. – Dan Tenenbaum Apr 08 '18 at 23:44
  • On the play.golang.org link, the first `maxGoroutines - 1` don't seem to run. If it is set to 10, 0-8 don't print. If I set it to 3, 0 and 1 don't print. Are they running and not printing or are they not running? Or is execution over before they can print? – Naikrovek Jul 18 '18 at 01:54
  • 10
    Main exits before those goroutines can print. See updated play.golang.org link here: https://play.golang.org/p/HovNRgp6FxH – Naikrovek Jul 18 '18 at 11:43
  • does the channel have to by of type struct? Shouldn't a bool suffice for example? – Muppet Jan 08 '19 at 23:05
  • 1
    @Muppet using empty struct instead of i.e. bool is more of a convention; using bool here is ok, but reader may think that bool value (true/false) may have some meaning, while they're not in this use case. Empty structs also use less memory, for details see https://dave.cheney.net/2014/03/25/the-empty-struct – artyom Jan 10 '19 at 13:03
  • 2
    This answer is misleading to the point it makes me wonder for its so many upvotes; the main will exit before all goroutines have time to complete; incorporation of a `sync.WaitGroup` as suggested by @Naikrovek seems more appropriate – pkaramol Oct 28 '21 at 20:45
60

Here I think something simple like this will work :

package main

import "fmt"

const MAX = 20

func main() {
    sem := make(chan int, MAX)
    for {
        sem <- 1 // will block if there is MAX ints in sem
        go func() {
            fmt.Println("hello again, world")
            <-sem // removes an int from sem, allowing another to proceed
        }()
    }
}
Emil Davtyan
  • 13,808
  • 5
  • 44
  • 66
  • There is a correction, it will be `sem := make(chan int, MAX)` as `make(int,Max)` is incorrect so it is supposed to be `chan int`. Btw thanks for the answer. – Hamza Anis Feb 21 '19 at 00:33
  • the code comments are good - I tested it and it seems to work - can you explain how this works in more depth? –  Jun 05 '20 at 23:58
  • 6
    This is a nice and simple solution! However for people thinking of using this, I would recommend using an empty struct channel: `make(chan struct{}, MAX)`. That way, it's very clear to the reader that we don't actually care about the values passed on the channel. As a bonus, an empty struct uses 0 bytes in memory. – Accio Aug 16 '22 at 11:49
42

Thanks to everyone for helping me out with this. However, I don't feel that anyone really provided something that both worked and was simple/understandable, although you did all help me understand the technique.

What I have done in the end is I think much more understandable and practical as an answer to my specific question, so I will post it here in case anyone else has the same question.

Somehow this ended up looking a lot like what OneOfOne posted, which is great because now I understand that. But OneOfOne's code I found very difficult to understand at first because of the passing functions to functions made it quite confusing to understand what bit was for what. I think this way makes a lot more sense:

package main

import (
"fmt"
"sync"
)

const xthreads = 5 // Total number of threads to use, excluding the main() thread

func doSomething(a int) {
    fmt.Println("My job is",a)
    return
}

func main() {
    var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
    var wg sync.WaitGroup

    // This starts xthreads number of goroutines that wait for something to do
    wg.Add(xthreads)
    for i:=0; i<xthreads; i++ {
        go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                    wg.Done()
                    return
                }
                doSomething(a) // do the thing
            }
        }()
    }

    // Now the jobs can be added to the channel, which is used as a queue
    for i:=0; i<50; i++ {
        ch <- i // add i to the queue
    }

    close(ch) // This tells the goroutines there's nothing else to do
    wg.Wait() // Wait for the threads to finish
}
Alasdair
  • 13,348
  • 18
  • 82
  • 138
  • Worked flawlessly out of the box. Exactly what I needed. – perelin Aug 07 '19 at 16:03
  • To further simplify this, note that there is no real need to use a buffered channel here. You'll end up blocking longer on writes to the channel, but that's fine because once you've written to the channel xthreads times, you're moving on to wait anyway. – Kent Rancourt Aug 26 '19 at 13:28
  • 3
    calling goroutines "threads" is sort of misleading, I would just call them "goroutines" –  Jun 06 '20 at 00:00
  • this solved my problem perfectly, but I don't get how I can return a value from `doSomething`. e.g. handle an error. – Jonny Rimek Apr 13 '21 at 15:25
  • @JonnyRimek, you handle the error within `doSomething`, each of the threads operates independently and so can independently handle their own errors. – Alasdair Apr 14 '21 at 03:42
16
  1. Create channel for passing data to goroutines.
  2. Start 20 goroutines that processes the data from channel in a loop.
  3. Send the data to the channel instead of starting a new goroutine.
Grzegorz Żur
  • 47,257
  • 14
  • 109
  • 105
11

Grzegorz Żur's answer is the most efficient way to do it, but for a newcomer it could be hard to implement without reading code, so here's a very simple implementation:

type idProcessor func(id uint)

func SpawnStuff(limit uint, proc idProcessor) chan<- uint {
    ch := make(chan uint)
    for i := uint(0); i < limit; i++ {
        go func() {
            for {
                id, ok := <-ch
                if !ok {
                    return
                }
                proc(id)
            }
        }()
    }
    return ch
}

func main() {
    runtime.GOMAXPROCS(4)
    var wg sync.WaitGroup //this is just for the demo, otherwise main will return
    fn := func(id uint) {
        fmt.Println(id)
        wg.Done()
    }
    wg.Add(1000)
    ch := SpawnStuff(10, fn)
    for i := uint(0); i < 1000; i++ {
        ch <- i
    }
    close(ch) //should do this to make all the goroutines exit gracefully
    wg.Wait()
}

playground

Community
  • 1
  • 1
OneOfOne
  • 95,033
  • 20
  • 184
  • 185
2

This is a simple producer-consumer problem, which in Go can be easily solved using channels to buffer the paquets.

To put it simple: create a channel that accept your IDs. Run a number of routines which will read from the channel in a loop then process the ID. Then run your loop that will feed IDs to the channel.

Example:

func producer() {
    var buffer = make(chan uint)

    for i := 0; i < 20; i++ {
        go consumer(buffer)
    }

    for _, id :=  range IDs {
        buffer <- id
    }
}

func consumer(buffer chan uint) {
    for {
        id := <- buffer
        // Do your things here
    }
}

Things to know:

  • Unbuffered channels are blocking: if the item wrote into the channel isn't accepted, the routine feeding the item will block until it is
  • My example lack a closing mechanism: you must find a way to make the producer to wait for all consumers to end their loop before returning. The simplest way to do this is with another channel. I let you think about it.
Elwinar
  • 9,103
  • 32
  • 40
0

I've wrote a simple package to handle concurrency for Golang. This package will help you limit the number of goroutines that are allowed to run concurrently: https://github.com/zenthangplus/goccm

Example:

package main

import (
    "fmt"
    "goccm"
    "time"
)

func main()  {
    // Limit 3 goroutines to run concurrently.
    c := goccm.New(3)

    for i := 1; i <= 10; i++ {

        // This function have to call before any goroutine
        c.Wait()

        go func(i int) {
            fmt.Printf("Job %d is running\n", i)
            time.Sleep(2 * time.Second)

            // This function have to when a goroutine has finished
            // Or you can use `defer c.Done()` at the top of goroutine.
            c.Done()
        }(i)
    }

    // This function have to call to ensure all goroutines have finished 
    // after close the main program.
    c.WaitAllDone()
}
0

Also can take a look here: https://github.com/LiangfengChen/goutil/blob/main/concurrent.go

The example can refer the test case.

func TestParallelCall(t *testing.T) {
    format := "test:%d"
    data := make(map[int]bool)
    mutex := sync.Mutex{}
    val, err := ParallelCall(1000, 10, func(pos int) (interface{}, error) {
        mutex.Lock()
        defer mutex.Unlock()
        data[pos] = true
        return pos, errors.New(fmt.Sprintf(format, pos))
    })

    for i := 0; i < 1000; i++ {
        if _, ok := data[i]; !ok {
            t.Errorf("TestParallelCall pos not found: %d", i)
        }
        if val[i] != i {
            t.Errorf("TestParallelCall return value is not right (%d,%v)", i, val[i])
        }
        if err[i].Error() != fmt.Sprintf(format, i) {
            t.Errorf("TestParallelCall error msg is not correct (%d,%v)", i, err[i])
        }
    }
}