3

I'm new to Go and concurrency in Go. I'm trying to use a Go context to cancel a set of Go routines once I find a member with a given ID.

A Group stores a list of Clients, and each Client has a list of Members. I want to search in parallel all the Clients and all their Members to find a Member with a given ID. Once this Member is found, I want to cancel all the other Go routines and return the discovered Member.

I've tried the following implementation, using a context.WithCancel and a WaitGroup.

This doesn't work however, and hangs indefinitely, never getting past the line waitGroup.Wait(), but I'm not sure why exactly.

func (group *Group) MemberWithID(ID string) (*models.Member, error) {
    found := make(chan *models.Member)
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    var waitGroup sync.WaitGroup

    for _, client := range group.Clients {
        waitGroup.Add(1)

        go func(clientToQuery Client) {
            defer waitGroup.Done()

            select {
            case <-ctx.Done():
                return
            default:
            }

            member, _ := client.ClientMemberWithID(ID)
            if member != nil {
                found <- member
                cancel()
                return
            }

        } (client)

    }

    waitGroup.Wait()

    if len(found) > 0 {
        return <-found, nil
    }

    return nil, fmt.Errorf("no member found with given id")
}
aralk
  • 73
  • 6

2 Answers2

4

found is an unbuffered channel, so sending on it blocks until there is someone ready to receive from it.

Your main() function would be the one to receive from it, but only after waitGroup.Wait() returns. But that will block until all launched goroutines call waitGroup.Done(). But that won't happen until they return, which won't happen until they can send on found. It's a deadlock.

If you change found to be buffered, that will allow sending values on it even if main() is not ready to receive from it (as many values as big the buffer is).

But you should receive from found before waitGroup.Wait() returns.

Another solution is to use a buffer of 1 for found, and use non-blocking send on found. That way the first (fastest) goroutine will be able to send the result, and the rest (given we're using non-blocking send) will simply skip sending.

Also note that it should be the main() that calls cancel(), not each launched goroutines individually.

icza
  • 389,944
  • 63
  • 907
  • 827
  • 1
    Oh I see, how would I know to call cancel from main though when a member is found? – aralk Sep 02 '21 at 13:54
  • In this case I wouldn't even use `Context`: you don't really use it. `client.ClientMemberWithID` doesn't use it, so it's rather useless here and only complicates things. It would be useful if you'd pass / use it when executing the database query. – icza Sep 02 '21 at 14:21
4

For this kind of use case I think a sync.Once is probably a better fit than a channel. When you find the first non-nil member, you want to do two different things:

  1. Record the member you found.
  2. Cancel the remaining goroutines.

A buffered channel can easily do (1), but makes (2) a bit more complicated. But a sync.Once is perfect for doing two different things the first time something interesting happens!


I would also suggest aggregating non-trivial errors, so that you can report something more useful than no member found if, say, your database connection fails or some other nontrivial error occurs. You can use a sync.Once for that, too!


Putting it all together, I would want to see something like this (https://play.golang.org/p/QZXUUnbxOv5):

func (group *Group) MemberWithID(ctx context.Context, id string) (*Member, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var (
        wg sync.WaitGroup

        member    *Member
        foundOnce sync.Once

        firstNontrivialErr error
        errOnce            sync.Once
    )

    for _, client := range group.Clients {
        wg.Add(1)
        client := client // https://golang.org/doc/faq#closures_and_goroutines
        go func() {
            defer wg.Done()

            m, err := client.ClientMemberWithID(ctx, id)
            if m != nil {
                foundOnce.Do(func() {
                    member = m
                    cancel()
                })
            } else if nf := (*MemberNotFoundError)(nil); !errors.As(err, &nf) {
                errOnce.Do(func() {
                    firstNontrivialErr = err
                })
            }
        }()
    }
    wg.Wait()

    if member == nil {
        if firstNontrivialErr != nil {
            return nil, firstNontrivialErr
        }
        return nil, &MemberNotFoundError{ID: id}
    }
    return member, nil
}
bcmills
  • 4,391
  • 24
  • 34
  • 2
    The equivalent using buffered channels instead of `sync.Once` (https://play.golang.org/p/blsYV_eMgN_X) also works, but seems a bit more verbose to me. – bcmills Sep 02 '21 at 14:10
  • thank you so much for this solution, sync.Once seems really useful I hadn't heard of that before – aralk Sep 03 '21 at 09:30