158

to start an endless loop of executing two goroutines, I can use the code below:

after receiving the msg it will start a new goroutine and go on for ever.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

I would now like to have the same behavior for N goroutines, but how will the select statement look in that case?

This is the code bit I have started with, but I am confused how to code the select statement

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
David Moles
  • 48,006
  • 27
  • 136
  • 235
JohnSmith
  • 4,538
  • 9
  • 25
  • 25
  • 6
    I think what you're wanting is Channel Multiplexing. http://golang.org/doc/effective_go.html#chan_of_chan Basically, you have one single channel you listen to and then multiple child channels that funnel into the main channel. Related SO Question: http://stackoverflow.com/questions/10979608/is-it-possible-to-multiplex-several-channels-into-one – Brenden Nov 15 '13 at 19:04

7 Answers7

181

You can do this using the Select function from the reflect package:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select executes a select operation described by the list of cases. Like the Go select statement, it blocks until at least one of the cases can proceed, makes a uniform pseudo-random choice, and then executes that case. It returns the index of the chosen case and, if that case was a receive operation, the value received and a boolean indicating whether the value corresponds to a send on the channel (as opposed to a zero value received because the channel is closed).

You pass in an array of SelectCase structs that identify the channel to select on, the direction of the operation, and a value to send in the case of a send operation.

So you could do something like this:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

You can experiment with a more fleshed out example here: http://play.golang.org/p/8zwvSk4kjx

Zac
  • 730
  • 1
  • 12
  • 19
James Henstridge
  • 42,244
  • 6
  • 132
  • 114
  • 4
    Is there a practical limit to the number of cases in such select? The one that if you go beyond it, then performance is severely impacted? – Maxim Vladimirsky Oct 19 '15 at 23:39
  • 8
    Maybe it's my incompetency, but I found this pattern really hard to work with when you are sending & receiving complex structures through the channel. Passing a shared "aggregate" channel, as Tim Allclair said, was much easier in my case. – Bora M. Alper Jul 20 '17 at 08:13
128

You can accomplish this by wrapping each channel in a goroutine which "forwards" messages to a shared "aggregate" channel. For example:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

If you need to know which channel the message originated from, you could wrap it in a struct with any extra information before forwarding it to the aggregate channel.

In my (limited) testing, this method greatly out performs using the reflect package:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Benchmark code here

Tim Allclair
  • 7,347
  • 2
  • 27
  • 25
  • 2
    Your benchmark code is incorrect, you need [to loop over `b.N`](https://golang.org/pkg/testing/#hdr-Benchmarks) within a benchmark. Otherwise the results (which are divided by `b.N`, 1 and 2000000000 in your output) will be completely meaningless. – Dave C Sep 02 '15 at 16:27
  • 2
    @DaveC Thank you! The conclusion doesn't change, but the results are much more sane. – Tim Allclair Sep 02 '15 at 17:09
  • 1
    Indeed, I did a quick hack on your benchmark code to get [some actual numbers](https://gist.github.com/dchapes/5ed761c4c4023850b2de). There very well may be something still missing/wrong from this benchmark but the only thing the more complicated reflect code has going for it is that the setup is faster (with GOMAXPROCS=1) since it doesn't need a bunch of goroutines. In every other case a simple goroutine merging channel blows away the reflect solution (by ~2 orders of magnitude). – Dave C Sep 02 '15 at 17:28
  • 5
    One important downside (compared to the `reflect.Select` approach) is that the goroutines doing the merging buffer at minimum a single value on each channel being merged. Usually that won't be a problem but in some specific applications that may be a deal breaker :(. – Dave C Sep 02 '15 at 17:36
  • @DaveC I'm not sure I understand what you're saying, but wouldn't a buffered merge channel fix that case? – Tim Allclair Sep 03 '15 at 07:29
  • 2
    a buffered merge channel makes the problem worse. The issue is that only the reflect solution can have fully un-buffered semantics. I've went ahead and posted the test code I was experimenting with as a separate answer to (hopefully) clarify what I was trying to say. – Dave C Sep 03 '15 at 16:54
  • Ah, makes sense now. [Here](https://gist.github.com/timstclair/dd6df610c7c372de66a9#file-dynamic_select_test-go-L115) is yet another alternative with similar semantics to reflection and better performance (with sufficient parallelism) at the cost of complexity (benchmarkHybrid). Basically, if the thread needs to block the performance hit from reflection will probably be negligible. If it doesn't need to block, the first loop will receive. – Tim Allclair Sep 03 '15 at 18:18
  • 1
    Shouldn't you write `ch := ch` just before the goroutine, or pass `ch` as an argument to the goroutine, so that each goroutine uses a separate `ch` value? – musiphil Jun 15 '16 at 23:55
  • @musiphil thanks for pointing that out! I had fixed it in the benchmark code, but forgot to update the answer. – Tim Allclair Jun 22 '16 at 18:41
31

To expand on some comments on previous answers and to provide a clearer comparison here is an example of both approaches presented so far given the same input, a slice of channels to read from and a function to call for each value which also need to know which channel the value came from.

There are three main differences between the approaches:

  • Complexity. Although it may partially be a reader preference I find the channel approach more idiomatic, straight-forward, and readable.

  • Performance. On my Xeon amd64 system the goroutines+channels out performs the reflect solution by about two orders of magnitude (in general reflection in Go is often slower and should only be used when absolutely required). Of course, if there is any significant delay in either the function processing the results or in the writing of values to the input channels this performance difference can easily become insignificant.

  • Blocking/buffering semantics. The importantance of this depends on the use case. Most often it either won't matter or the slight extra buffering in the goroutine merging solution may be helpful for throughput. However, if it is desirable to have the semantics that only a single writer is unblocked and it's value fully handled before any other writer is unblocked, then that can only be achieved with the reflect solution.

Note, both approaches can be simplified if either the "id" of the sending channel isn't required or if the source channels will never be closed.

Goroutine merging channel:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Reflection select:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Full code on the Go playground.]

Dave C
  • 7,729
  • 4
  • 49
  • 65
  • 1
    It's also worth noting that the goroutines+channels solution can't do everything `select` or `reflect.Select` does. The goroutines will keep on spinning until they consume everything from the channels, so there is no clear way you could make `Process1` exit early. There is also the potential for problems if you have multiple readers, since the goroutines buffer one item from each of the channels, which won't happen with `select`. – James Henstridge Nov 04 '15 at 03:44
  • @JamesHenstridge, your first note about stopping isn't true. You'd arrange to stop Process1 the exact same way you'd arrange to stop Process2; e.g. by added a "stop" channel that is closed when the goroutines should stop. Process1 would need a two case `select` within a `for` loop instead of the simpler `for range` loop currently used. Process2 would need to stick another case into `cases` and special handle that value of `i`. – Dave C Jan 07 '16 at 20:42
  • 1
    That still doesn't solve the problem that you're reading values from the channels that won't be used in the stop early case. – James Henstridge Jan 10 '16 at 01:16
6

We actually made some research about this subject and found the best solution. We used reflect.Select for a while and it is a great solution for the problem. It is much lighter than a goroutine per channel and simple to operate. But unfortunately, it doesn't really support a massive amount of channels which is our case so we found something interesting and wrote a blog post about it: https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/

I'll summarize what is written there: We statically created batches of select..case statements for every result of the power of two of exponent up to 32 along with a function that routes to the different cases and aggregates the results through an aggregate channel.

An example of such a batch:

func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
    select {
    case r.v, r.ok = <-chanz[0]:
        r.i = i + 0
        res <- r
    case r.v, r.ok = <-chanz[1]:
        r.i = i + 1
        res <- r
    case r.v, r.ok = <-chanz[2]:
        r.i = i + 2
        res <- r
    case r.v, r.ok = <-chanz[3]:
        r.i = i + 3
        res <- r
    case <-ctx.Done():
        break
    }
}

And the logic of aggregating the first result from any number of channels using these kinds of select..case batches:

    for i < len(channels) {
        l = len(channels) - i
        switch {
        case l > 31 && maxBatchSize >= 32:
            go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
            i += 32
        case l > 15 && maxBatchSize >= 16:
            go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
            i += 16
        case l > 7 && maxBatchSize >= 8:
            go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
            i += 8
        case l > 3 && maxBatchSize >= 4:
            go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
            i += 4
        case l > 1 && maxBatchSize >= 2:
            go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
            i += 2
        case l > 0:
            go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
            i += 1
        }
    }
avivklas
  • 565
  • 7
  • 13
  • How does `maxBatchSize` get set? (Also, you have a typo: `maxBachSize`. I'd fix it, but the edit queue is full, and it'd still be wrong on the blog post.) – JakeRobb Feb 16 '22 at 21:26
  • 2
    `maxBatchSize` can be set as a config if you want to optimize according to some knowledge you have on the nature of your batches. I came up with this refinement when I wanted to test which maximum batch size is the most optimal. I hope you understood why from the blog post I shared. You can remove it in your implementation and decide on the max batch you prefer. We finally went with 64 because it performed best for thousands of channels which is our case. btw, thanks for bringing out the typo! I fixed it and will do it in the blog post as well. – avivklas Feb 20 '22 at 20:54
3

Possibly simpler option:

Instead of having an array of channels, why not pass just one channel as a parameter to the functions being run on separate goroutines, and then listen to the channel in a consumer goroutine?

This allows you to select on just one channel in your listener, making for a simple select, and avoiding creation of new goroutines to aggregate messages from multiple channels?

0

Based on the answer of James Henstridge, I made this generic (go >=1.18) Select function that takes a context and a slice of channels and returns the selected one:

func Select[T any](ctx context.Context, chs []chan T) (int, T, error) {
    var zeroT T
    cases := make([]reflect.SelectCase, len(chs)+1)
    for i, ch := range chs {
        cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
    }
    cases[len(chs)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
    // ok will be true if the channel has not been closed.
    chosen, value, ok := reflect.Select(cases)
    if !ok {
        if ctx.Err() != nil {
            return -1, zeroT, ctx.Err()
        }
        return chosen, zeroT, errors.New("channel closed")
    }
    if ret, ok := value.Interface().(T); ok {
        return chosen, ret, nil
    }
    return chosen, zeroT, errors.New("failed to cast value")
}

Here is an example on how to use it:

func TestSelect(t *testing.T) {
    c1 := make(chan int)
    c2 := make(chan int)
    c3 := make(chan int)
    chs := []chan int{c1, c2, c3}
    go func() {
        time.Sleep(time.Second)
        //close(c2)
        c2 <- 42
    }()
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

    chosen, val, err := Select(ctx, chs)

    assert.Equal(t, 1, chosen)
    assert.Equal(t, 42, val)
    assert.NoError(t, err)
}
Alain Gilbert
  • 63
  • 2
  • 8
-1

Why this approach wouldn't work assuming that somebody is sending events?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
  • 1,975
  • 1
  • 16
  • 18
  • 10
    This is a spin-loop. While waiting for an input channel to have a value this consumes all the CPU available. The whole point of `select` on multiple channels (without a `default` clause) is that it efficiently waits until at least one is ready without spinning. – Dave C Jun 29 '18 at 14:18