18

In my case, I have thousands of goroutines working simultaneously as work(). I also had a sync() goroutine. When sync starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := <- channel // blocked here
      if sync_stat == 0 { // if sync complete
        break  
      }
    }
    // Do some jobs
    if (some condition) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ {
    channels[i] <- 0
  }
  channels_mutex.Unlock()
}

Now the problem is, since <- is always blocking on read, every time goes to sync_stat := <- channel is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work() exits, and I didn't find any way to reopen a closed channel.

I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?

Dave C
  • 7,729
  • 4
  • 49
  • 65
Shane Hou
  • 4,808
  • 9
  • 35
  • 50

1 Answers1

27

If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int, 1)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    setState(workers, Running)

    // Pause workers.
    setState(workers, Paused)

    // Unpause workers.
    setState(workers, Running)

    // Shutdown workers.
    setState(workers, Stopped)
}

// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
    for _, w := range workers {
        w <- state
    }
}
jimt
  • 25,324
  • 8
  • 70
  • 60
  • 1
    What does ` <-time.After(1e1)` mean? – Shane Hou Apr 19 '13 at 10:19
  • It pauses the goroutine for 1 second (1e9 nanoseconds). It is used in this example to make it look like the controller is doing some actual work. `time.After()` returns a channel, which sends a signal after the given timeout. `<-time.After(N)`, simply blocks on that channel until that signal is received. – jimt Apr 19 '13 at 10:24
  • And this makes me realized another thought: why didn't we just use a global value represents controller status, and `workers` check the global value every time? I know it's not a good practice, but I want to know the reason. – Shane Hou Apr 19 '13 at 10:26
  • 3
    Using channels for communication is the idiomatic way to do this in Go. You can use a global variable if you want to. But I would advise against using a `sync.Mutex` to lock it. These do not scale very well when dealing with large numbers of goroutines, each acquiring R/W locks. In this case, I would use the `sync/atomic` package to atomically read/write the state. – jimt Apr 19 '13 at 10:29
  • Another problem I'm concerning is will send message to these large number of channels cause performance bottleneck? – Shane Hou Apr 19 '13 at 10:35
  • Channels will never be free. There is always some overhead. But just how this affects your application, depends on how you use them. Channels and goroutines are a pretty fundamental component of the Go language and an important part of what makes Go great for these kind of situations. The performance of both of these will therefore always a prime concern. Whether or not you will be satisfied with them, can really only be discovered by doing some benchmark tests. – jimt Apr 19 '13 at 10:44
  • 1
    @jimt Nice example; good reminder of how 'default' is run when the channel blocked. I'd add that in case the workers sometimes take a while to come back to read the status it might be nice to make the channel that talks to them have a buffer of 1, so you can write to all of them in a hurry instead of not pausing or stopping number 2 until number 1 stopped, etc. Or did I misunderstand how that works? – Ask Bjørn Hansen Apr 19 '13 at 16:30
  • Yes, a buffered channel can help there. – jimt Apr 19 '13 at 17:19
  • 1
    Maybe I am missing something but won't the paused workers keep running (due to the default part of the select). I think this would cause a lot of unnec. thread context switching. – Andrew W. Phillips Apr 02 '17 at 00:40
  • 2
    Yes, it will keep going into the default case, but it is not as expensive as you may expect. What this does, is set `state` to the appropriate value. It is up to you to decide if and how you want to throttle the loop. As far as context switching goes, [here's](https://dave.cheney.net/2015/08/08/performance-without-the-event-loop) pretty thorough explanation on how this works in Go. – jimt Apr 02 '17 at 11:34
  • @jimt please answer, what happens if we have a read from Process STDIN that blocks in the default case? If we send a new state would select be able to stop immediately? I'm talking about StdInPipe that we get from running exec command. – redpix_ Apr 16 '17 at 08:32
  • 2
    @redpix_ Any blocking call in the default case, will stop the loop from continuing. New values in the state channel will not be read, until the default case finishes. In your case, I would read stdin in a separate goroutine and write inputs into a channel, which can be put into the select{} block, as a new case. It would replace the default case. For example: https://play.golang.org/p/XZLdfD3OB_ – jimt Apr 17 '17 at 13:55
  • @jimt could you please answer here: [My New Unanswered Question](http://stackoverflow.com/questions/43436751/how-to-stop-goroutine-blocked-by-external-i-o-started-for-process), I was thinking about that actually but also then I don't see a way to stop that new goroutine that is blocked by pipe read, if you could answer my question I would be very thankful. I was reading about Context package and Pipelines too but I couldn't wrap my head around finishing goroutine blocked by pipe reading. Thanks a lot! – redpix_ Apr 17 '17 at 14:51
  • @jimt I actually have a real example there of my problem. – redpix_ Apr 17 '17 at 14:52
  • Either I don't understand a use case here, or this example doesn't solve anything. "case Running" is called only once, when "setState(workers, Running)" is executed. Also if I have another go(){} inside "case Running" that will never be stopped or paused by this code. – Mantas Jan 03 '22 at 18:37