48

I wish to have a go routine listening on two channels, blocked when both channels are drained. However, if both channels contains data, I want one to be drained before the other is handled.

In the working example below I wish all out to be drained before exit is handled. I use a select-statement which doesn't have any priority order. How might I get around the problem, making all 10 out-values be handled before the exit?

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Most likely not")
}
Sonia
  • 27,135
  • 8
  • 52
  • 54
ANisus
  • 74,460
  • 29
  • 162
  • 158
  • 1
    For the example you gave, you just need the out channel and close it after sending is complete. – zach May 20 '18 at 11:48

9 Answers9

43

The language supports this natively and no workaround is required. It's very simple: the quit channel should only be visible to the producer. On quit, the producer closes the channel. Only when the channel is empty and closed does the consumer quit. This is made possible by ranging over the channel.

Here is an example to illustrate:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case <-quit:
            fmt.Println("RECV QUIT")
            return
        default:
            out <- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println("SLEEP", d)
    time.Sleep(d)
    fmt.Println("SEND QUIT")
    quit <- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println("Produced:", produced)
    fmt.Println("Processed:", processed)
}
Inanc Gumus
  • 25,195
  • 9
  • 85
  • 101
jorelli
  • 8,064
  • 4
  • 36
  • 35
  • 1
    Thanks this is exactly the solution I was looking for, and it doesn't have the potential race condition bug that is in Sonia's answer – BrandonAGr Oct 18 '12 at 17:12
  • 1
    just range over the vals channel in the main routine will work – zach May 20 '18 at 11:54
  • Wirth noting that whild entirely correct under the question’s premises, this won’t work for “N-producers-1-consumer” case, because closing `out` channel without synchronization between producers can trigger panic. Chicken-egg-problem, because such synchronization requires priority select between `quit` and `out` :) – agronskiy Nov 18 '20 at 09:29
34
package main

import "fmt"

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
    exit <- true
}

func main() {
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        default:
        }
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        case <-exit:
            fmt.Println("Exiting")
        }
        break
    }
    fmt.Println("Did we get all 10? I think so!")
}

The default case of the first select makes it non-blocking. The select will drain the out channel without looking at the exit channel, but otherwise will not wait. If the out channel is empty, it immediately drops to the second select. The second select is blocking. It will wait for data on either channel. If an exit comes, it handles it and allows the loop to exit. If data comes, it goes back up the top of the loop and back into drain mode.

Sonia
  • 27,135
  • 8
  • 52
  • 54
  • 1
    The idea is very similar to my own. But true, with the `continue`-statement, you get rid of the need of a flag. Smart. Well, this is probably as good an answer as I can assume to get. Thanks! – ANisus Jun 20 '12 at 18:18
  • 2
    this will loop infinitely in the first select statement if the out channel is closed. – jorelli Jun 23 '12 at 18:45
  • 1
    jorelli, quite true. If you wanted to allow for hostile or buggy goroutines closing the channel unexpectedly, you would check the ok status on the receive. – Sonia Jun 23 '12 at 22:11
  • 21
    This is actually not an entirely correct solution, since it is possible for both queues to receive data in a single context switch. The behavior of `select` when multiple queues are ready is indeterminate (pseudo-random). – bug Aug 24 '12 at 01:16
  • 8
    This doesn't seam correct. When blocking on the second `select`, if data arrives on `out` and `exit` channels, there is no guarantee that data in `out` will be processed before `exit`. I actually believe that there is no solution with channels. – chmike Sep 06 '16 at 14:18
  • This doesn't work! I tested it and it is totally wrong! – Amin Nov 14 '20 at 16:08
  • It only increases the success probability :) – Mohsenasm Sep 01 '21 at 08:14
7

Another approach:

package main

import "fmt"

func sender(c chan int) chan int {
        go func() {
                for i := 1; i <= 15; i++ {
                        c <- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf("Value: %d\n", i)
        }
        fmt.Println("Did we get all 15? Surely yes")
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$ 
zzzz
  • 87,403
  • 16
  • 175
  • 139
  • 1
    Thanks for the suggestion! If I understand you correctly, you suggest using only one channel, calling an exit by closing the channel, thus breaking the `for range`-statement. True, maybe that is a better way to do it, but in my case I am working with two channels. – ANisus Jun 20 '12 at 12:26
6

Here's a general idiom that solves the select's priority problem.

Yes, it's not nice to say a least, but does what is needed for 100%, no pitfalls and no hidden limitations.

Here's a short code example, and explanation follows.

package main

import(
    "fmt"
    "time"
)

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }

    time.Sleep(2000 * time.Millisecond)
    out <- 11
    exit <- true
}

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)

    go sender(out, exit)

    time.Sleep(500 * time.Millisecond)

    L:
    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {
                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }
    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

And, here's how it works, the main() from above, annotated:

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)
    go sender(out, exit)
    time.Sleep(500 * time.Millisecond)
    L:
    for {
        select {

            // here we go when entering next loop iteration
            // and check if the out has something to be read from

            // this select is used to handle buffered data in a loop

        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            // else we fallback in here

            select {

                // this select is used to block when there's no data in either chan

            case i := <-out:
            // if out has something to read, we unblock, and then go the loop round again

                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {

                    // this select is used to explicitly propritize one chan over the another,
                    // in case we woke up (unblocked up) on the low-priority case

                    // NOTE:
                    // this will prioritize high-pri one even if it came _second_, in quick
                    // succession to the first one

                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }

    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

NOTE: Before playing tricks with prioritizations, MAKE SURE YOU ARE SOLVING THE RIGHT PROBLEM.

Chances are, it can be solved differently.

Still, to have prioritized select in Go would have been great thing. Just a dream..

NOTE: This is quite a similar answer https://stackoverflow.com/a/45854345/11729048 on this thread, but there is only two select-s are nested, not three ones as I did. What's the difference? My approach is more efficient, and there we explicitly expect to handle random choices on each loop iteration.

However, if the high-priority channel isn't buffered, and/or you don't expect bulk data on it, only the sporadic single events, then the simpler two-stage idiom (as in that answer) will suffice:

L:
for {
    select {
    case i := <-out:
        fmt.Printf("Value: %d\n", i)
    case <-exit:
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            fmt.Println("Exiting")
            break L
        }
    }
}

It is basically 2 and 3 stages, the 1 being removed.

And once again: in like 90% cases you think you do need to prioritize chan switch cases, you really don't.

And here's a one-liner, that can be wrapped in a macro:

for {
    select { case a1 := <-ch_p1: p1_action(a1); default: select { case a1 := <-ch_p1: p1_action(a1); case a2 := <-ch_p2: select { case a1 := <-ch_p1: p1_action(a1); default: p2_action(a2); }}}
}

And what if you want to prioritize more than two cases?

Then you have two options. First one - build a tree, using intermediate goroutines, so that each fork is exactly binary (the above idiom).

The second option is to make the priority-fork more then double.

Here's an example of three priorities:

for {
    select {
    case a1 := <-ch_p1:
        p1_action(a1)
    default:
        select {
        case a2 := <-ch_p2:
            p2_action(a2)
        default:
            select {    // block here, on this select
            case a1 := <-ch_p1:
                p1_action(a1)
            case a2 := <-ch_p2:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                default:
                    p2_action(a2)
                }
            case a3 := <-ch_p3:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                case a2 := <-ch_p2:
                    p1_action(a2)
                default:
                    p2_action(a3)
                }
            }
        }
    }
}

That is, the whole structure is conceptually split into three parts, as the original (binary) one.

One again: chances are, you can design your system so that you can avoid this mess.

P.S., the rhetoric question: why Golang doesn't have it built in into the language??? The question is rhetoric one.

latitov
  • 432
  • 3
  • 8
2

Here's another option.

Consumer Code:

  go func() {
    stop := false
    for {
      select {
      case item, _ := <-r.queue:
        doWork(item)
      case <-r.stopping:
        stop = true
      }
      if stop && len(r.queue) == 0 {
        break
      }
    }
  }()
scosman
  • 2,343
  • 18
  • 34
1

I have created one rather simple workaround. It does what I want, but if anyone else has a better solution, please let me know:

exiting := false
for !exiting || len(out)>0 {
    select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        case <-exit:
            exiting = true
            fmt.Println("Exiting")
    }
}

Instead of exiting on receiving, I flag an exit, exiting once I've made sure nothing is left in chan out.

ANisus
  • 74,460
  • 29
  • 162
  • 158
  • 1
    This works and is nice and compact, but uses some tricks you should try to avoid in general. Flags get confusing as programs get bigger. They are kind of like gotos. More seriously, len(chan) can often introduce races. It looks okay in this situation, but in many cases it's invalid to make a decision based on len(chan) because it can change before you take action. Imagine the case where you get len==0, then a value arrives, then an exit arrives, and select picks the exit. You might shrug and say they arrived at about the same time, but in some time critical programs, it could matter. – Sonia Jun 20 '12 at 14:30
  • Umm, maybe it still works in the case I described. Sorry if it's a bad example. But anyway, I try to avoid using len in synchronization code. – Sonia Jun 20 '12 at 17:20
  • Hi again Sonia :) . Good input. Yes, in my case it doesn't matter much. I just wanted to flush what was going out before exiting. However, I actually redid the code using `for range` and `close(out)` instead (as suggested by jmnl). Then only the out-events placed in the channel pipe preceding the close would be "flushed". I will avoid decision making based on len(chan) if Nasdaq ever asks me to do some Go program for them ;) – ANisus Jun 20 '12 at 19:04
1

I think Sonia's answer is incorrect.This is my solution,a little bit complicate.

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                for{
                    select{
                    case i:=<-out:
                        fmt.Printf("Value: %d\n", i)
                    default:
                        fmt.Println("Exiting")
                        break L
                    }
                }
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Yes!")
}
海牙客移库
  • 141
  • 1
  • 11
0

In my case, I really wanted to prioritise data from one channel over another, and not just have an out-of-band exit signal. For the benefit of anyone else with the same issue I think this approach works without the potential race condition:

OUTER:
for channelA != nil || channelB != nil {

    select {

    case typeA, ok := <-channelA:
        if !ok {
            channelA = nil
            continue OUTER
        }
        doSomething(typeA)

    case nodeIn, ok := <-channelB:
        if !ok {
            channelB = nil
            continue OUTER
        }

        // Looped non-blocking nested select here checks that channelA
        // really is drained before we deal with the data from channelB
        NESTED:
        for {
            select {
            case typeA, ok := <-channelA:
                if !ok {
                    channelA = nil
                    continue NESTED
                }
                doSomething(typeA)

            default:
                // We are free to process the typeB data now
                doSomethingElse(typeB)
                break NESTED
            }
        }
    }

}
monoi
  • 41
  • 4
0

Is there any specific reason for using a buffered channel make(chan int, 10)?

You need to use an unbuffered channel vs buffered, which you are using.

Just remove 10, it should be just make(chan int).

This way execution in the sender function can only proceed to the exit <- true statement after the last message from the out channel is dequeued by the i := <-out statement. If that statement has not been executed, there is no way the exit <- true could be reached in the goroutine.

George Polevoy
  • 7,450
  • 3
  • 36
  • 61