125

This is a good example of workers & controller mode in Go written by @Jimt, in answer to "Is there some elegant way to pause & resume any other goroutine in golang?"

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }

    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }

    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }

    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}

But this code also has an issue: If you want to remove a worker channel in workers when worker() exits, dead lock happens.

If you close(workers[i]), next time controller writes into it will cause a panic since go can't write into a closed channel. If you use some mutex to protect it, then it will be stuck on workers[i] <- Running since the worker is not reading anything from the channel and write will be blocked, and mutex will cause a dead lock. You can also give a bigger buffer to channel as a work-around, but it's not good enough.

So I think the best way to solve this is worker() close channel when exits, if the controller finds a channel closed, it will jump over it and do nothing. But I can't find how to check a channel is already closed or not in this situation. If I try to read the channel in controller, the controller might be blocked. So I'm very confused for now.

PS: Recovering the raised panic is what I have tried, but it will close goroutine which raised panic. In this case it will be controller so it's no use.

Still, I think it's useful for Go team to implement this function in next version of Go.

Community
  • 1
  • 1
Shane Hou
  • 4,808
  • 9
  • 35
  • 50

10 Answers10

97

There's no way to write a safe application where you need to know whether a channel is open without interacting with it.

The best way to do what you're wanting to do is with two channels -- one for the work and one to indicate a desire to change state (as well as the completion of that state change if that's important).

Channels are cheap. Complex design overloading semantics isn't.

[also]

<-time.After(1e9)

is a really confusing and non-obvious way to write

time.Sleep(time.Second)

Keep things simple and everyone (including you) can understand them.

Dustin
  • 89,080
  • 21
  • 111
  • 133
  • 4
    mmm, I'm new to GO but i think that time.Sleep will block the main thread, wheras <-time.After(...) will block if no other channels are producing, but WONT block any other channels also being queried inside a select. But I totally agree with being explicit. I get what you were saying though. – snowcode Sep 23 '21 at 20:39
96

In a hacky way it can be done for channels which one attempts to write to by recovering the raised panic. But you cannot check if a read channel is closed without reading from it.

Either you will

  • eventually read the "true" value from it (v <- c)
  • read the "true" value and 'not closed' indicator (v, ok <- c)
  • read a zero value and the 'closed' indicator (v, ok <- c) (example)
  • will block in the channel read forever (v <- c)

Only the last one technically doesn't read from the channel, but that's of little use.

Amin Shojaei
  • 5,451
  • 2
  • 38
  • 46
zzzz
  • 87,403
  • 16
  • 175
  • 139
9

I know this answer is so late, I have wrote this solution, Hacking Go run-time, It's not safety, It may crashes:

import (
    "unsafe"
    "reflect"
)


func isChanClosed(ch interface{}) bool {
    if reflect.TypeOf(ch).Kind() != reflect.Chan {
        panic("only channels!")
    }
    
    // get interface value pointer, from cgo_export 
    // typedef struct { void *t; void *v; } GoInterface;
    // then get channel real pointer
    cptr := *(*uintptr)(unsafe.Pointer(
        unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
    ))
    
    // this function will return true if chan.closed > 0
    // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
    // type hchan struct {
    // qcount   uint           // total data in the queue
    // dataqsiz uint           // size of the circular queue
    // buf      unsafe.Pointer // points to an array of dataqsiz elements
    // elemsize uint16
    // closed   uint32
    // **
    
    cptr += unsafe.Sizeof(uint(0))*2
    cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
    cptr += unsafe.Sizeof(uint16(0))
    return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
  • 2
    `go vet` returns "possible misuse of unsafe.Pointer" on last line `return *(*uint32)(unsafe.Pointer(cptr)) > 0` and `cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))` is there an option to do it without unsafe.Pointer in those lines? – Effi Bar-She'an Nov 16 '17 at 13:29
  • 3
    You need to do all the pointer arithmetic in one expression to keep go vet happy. This solution is a data race and not valid Go, you'd also have to at minimum do the read of closed with atomic.LoadUint32. It's a pretty fragile hack either way though, if hchan changes between Go versions this will break. – Eloff Nov 18 '19 at 13:25
  • 6
    this is probably very clever, but it feels like adding a problem on top of another problem – Ярослав Рахматуллин Jul 23 '20 at 22:13
  • This causes a panic in go 1.16.6 Getting error message `Panic: fatal error: checkptr: pointer arithmetic result points to invalid allocation` at line `return *(*uint32)(unsafe.Pointer(cptr)) > 0` – Tushar Das Dec 14 '21 at 09:57
  • hchan's closed property is protected via a lock. Reading this value without acquiring that lock is pointless. Just like the knowledge whether a channel is open is useless unless you protect any close operation while hanging on to this information which in turn provides you with knowledge about a channel being open in the first place. I appreciate the hack though! – Joa Ebert Jan 24 '22 at 11:28
3

Well, you can use default branch to detect it, for a closed channel will be selected, for example: the following code will select default, channel, channel, the first select is not blocked.

func main() {
    ch := make(chan int)

    go func() {
        select {
        case <-ch:
            log.Printf("1.channel")
        default:
            log.Printf("1.default")
        }
        select {
        case <-ch:
            log.Printf("2.channel")
        }
        close(ch)
        select {
        case <-ch:
            log.Printf("3.channel")
        default:
            log.Printf("3.default")
        }
    }()
    time.Sleep(time.Second)
    ch <- 1
    time.Sleep(time.Second)
}

Prints

2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel

Note, refer to comment by @Angad under this answer:

It doesn't work if you're using a Buffered Channel and it contains unread data

Amin Shojaei
  • 5,451
  • 2
  • 38
  • 46
acrazing
  • 1,955
  • 16
  • 24
  • 5
    There is one problem with this solution (as well as the rather nicely written https://go101.org/article/channel-closing.html that proposes a similar solution) - it doesn't work if you're using a Buffered Channel and it contains unread data – Angad Nov 15 '18 at 14:46
  • @Angad It's true this isn't a perfect solution for detecting a closed channel. It is a perfect solution for detecting whether reading the channel will block. (i.e. If reading the channel will block, then we know it's not closed; If reading the channel won't block, we know it may be closed). – tombrown52 Jun 30 '20 at 21:12
  • 1/ this answer is halfway correct as described by other comments. 2/ The last edit is dubious, of course `It doesn't work if you're using a Buffered Channel and it contains unread data`, if the channel still has data in it, it has not yet close the read side, only the write side. –  Jul 17 '21 at 11:37
2

I have had this problem frequently with multiple concurrent goroutines.

It may or may not be a good pattern, but I define a a struct for my workers with a quit channel and field for the worker state:

type Worker struct {
    data chan struct
    quit chan bool
    stopped bool
}

Then you can have a controller call a stop function for the worker:

func (w *Worker) Stop() {
    w.quit <- true
    w.stopped = true
}

func (w *Worker) eventloop() {
    for {
        if w.Stopped {
            return
        }
        select {
            case d := <-w.data:
                //DO something
                if w.Stopped {
                    return
                }
            case <-w.quit:
                return
        }
    }
}

This gives you a pretty good way to get a clean stop on your workers without anything hanging or generating errors, which is especially good when running in a container.

jregovic
  • 29
  • 1
  • 5
    doesn't this introduce a race condition? I mean if you write to `w.quit` but don't read from that channel in that instance, it will block and `stopped` won't become `true` neither. – FObersteiner Feb 26 '21 at 15:11
  • The solution to is not to write to `w.quit` but closing it instead (this is how `<-context.Done()` works). Reading from a closed channel returns instantly. do `close(w.quit)` in `Stop` and use `case <-w.quit:` to handle shutdown of the loop. The nice thing about closing instead of sending is that 1. it does not block 2. it can be used with any amount of receivers. With a single close you can (if you want to) shutdown a whole cluster without needing to know how many receivers are in it. – Simerax Jan 17 '23 at 15:42
1

You could set your channel to nil in addition to closing it. That way you can check if it is nil.

example in the playground: https://play.golang.org/p/v0f3d4DisCz

edit: This is actually a bad solution as demonstrated in the next example, because setting the channel to nil in a function would break it: https://play.golang.org/p/YVE2-LV9TOp

0
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
    for i:=0; i<10; i++{
        ch1 <- i
    }
    close(ch1)
}()
go func(){
    for i:=10; i<15; i++{
        ch2 <- i
    }
    close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
    ok1, ok2 = true, true
    select{
        case v,ok1 = <-ch1:
        if ok1 {fmt.Println(v)}
        default:
    }
    select{
        case v,ok2 = <-ch2:
        if ok2 {fmt.Println(v)}
        default:
    }
    if !ok1 && !ok2{return}
    
}

}

-3

From the documentation:

A channel may be closed with the built-in function close. The multi-valued assignment form of the receive operator reports whether a received value was sent before the channel was closed.

https://golang.org/ref/spec#Receive_operator

Example by Golang in Action shows this case:

// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

func init() {
    rand.Seed(time.Now().UnixNano())
}

// main is the entry point for all Go programs.
func main() {
    // Create an unbuffered channel.
    court := make(chan int)
    // Add a count of two, one for each goroutine.
    wg.Add(2)
    // Launch two players.
    go player("Nadal", court)
    go player("Djokovic", court)
    // Start the set.
    court <- 1
    // Wait for the game to finish.
    wg.Wait()
}

// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()
    for {
        // Wait for the ball to be hit back to us.
        ball, ok := <-court
        fmt.Printf("ok %t\n", ok)
        if !ok {
            // If the channel was closed we won.
            fmt.Printf("Player %s Won\n", name)
            return
        }
        // Pick a random number and see if we miss the ball.
        n := rand.Intn(100)
        if n%13 == 0 {
            fmt.Printf("Player %s Missed\n", name)
            // Close the channel to signal we lost.
            close(court)
            return
        }

        // Display and then increment the hit count by one.
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++
        // Hit the ball back to the opposing player.
        court <- ball
    }
}
Israel Barba
  • 1,434
  • 20
  • 28
  • 6
    The question was how to check for closed state without reading the channel, i.e. before writing to it. – Peter Nov 08 '19 at 14:16
-6

it's easier to check first if the channel has elements, that would ensure the channel is alive.

func isChanClosed(ch chan interface{}) bool {
    if len(ch) == 0 {
        select {
        case _, ok := <-ch:
            return !ok
        }
    }
    return false 
}
Enric
  • 3
  • 4
    As [mentioned by Dustin](http://stackoverflow.com/a/16127317/55504), there is no way to do this safely. By the time you get into your `if` body `len(ch)` could be anything. (e.g. a goroutine on another core sends a value to the channel before your select tries to read). – Dave C Jul 20 '15 at 16:13
-9

If you listen this channel you always can findout that channel was closed.

case state, opened := <-ws:
    if !opened {
         // channel was closed 
         // return or made some final work
    }
    switch state {
        case Stopped:

But remember, you can not close one channel two times. This will raise panic.

jurka
  • 12,945
  • 3
  • 22
  • 20
  • 8
    I said "without reading it", -1 for not read question carefully. – Shane Hou Apr 19 '13 at 13:32
  • >PS: Recovering the raised panic is what I have tried, but it will close goroutine which raised panic. In this case it will be controller so it's no use. You always can go func(chan z){ defer func(){ //handle recover} close(z)} – jurka Apr 19 '13 at 13:46
  • But I have to reserve the controller, and `close(z)` will be called by worker instead of controller. – Shane Hou Apr 19 '13 at 13:53