7

I am a golang newbie who is trying to understand the correct design pattern for this problem. My current solution seems very verbose, and I'm not sure what the better approach would be.

I am trying to design a system that:

  1. executes N goroutines
  2. returns the result of each goroutine as soon as it is available
  3. if a goroutine returns a particular value, it should kill other goroutines will cancel.

The goal: I want to kick off a number of goroutines, but I want to cancel the routines if one routine returns a particular result.

I'm trying to understand if my code is super "smelly" or if this is the prescribed way of doing things. I still don't have a great feeling for go, so any help would be appreciated.

Here is what I've written:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {

    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)

    fooCheck := make(chan bool)
    barCheck := make(chan bool)

    go foo(ctx, 3000, fooCheck)
    go bar(ctx, 5000, barCheck)

    for fooCheck != nil ||
        barCheck != nil {
        select {
        case res, ok := <-fooCheck:
            if !ok {
                fooCheck = nil
                continue
            }
            if res == false {
                cancel()
            }
            fmt.Printf("result of foocheck: %t\n", res)
        case res, ok := <-barCheck:
            if !ok {
                barCheck = nil
                continue
            }
            fmt.Printf("result of barcheck: %t\n", res)
        }
    }
    fmt.Printf("here we are at the end of the loop, ready to do some more processing...")
}

func foo(ctx context.Context, pretendWorkTime int, in chan<- bool) {
    fmt.Printf("simulate doing foo work and pass ctx down to cancel down the calltree\n")
    time.Sleep(time.Millisecond * time.Duration(pretendWorkTime))

    select {
    case <-ctx.Done():
        fmt.Printf("\n\nWe cancelled this operation!\n\n")
        break
    default:
        fmt.Printf("we have done some foo work!\n")
        in <- false
    }
    close(in)
}

func bar(ctx context.Context, pretendWorkTime int, in chan<- bool) {
    fmt.Printf("simulate doing bar work and pass ctx down to cancel down the calltree\n")
    time.Sleep(time.Millisecond * time.Duration(pretendWorkTime))

    select {
    case <-ctx.Done():
        fmt.Printf("\n\nWe cancelled the bar operation!\n\n")
        break
    default:
        fmt.Printf("we have done some bar work!\n")
        in <- true
    }
    close(in)
}

(play with the code here: https://play.golang.org/p/HAA-LIxWNt0)

The output works as expected, but I'm afraid I'm making some decision which will blow off my foot later.

clo_jur
  • 1,359
  • 1
  • 11
  • 27
  • your code does not seem to reflect the description of your intents (it vaguely resembling your description), too simplistic if i may say. The for loop does not exit when the value is found, it does not look like it returns ASAP to me. It is also weird because you have two channs of the same type to select on, one is enough. I sketched a rewrite of the current op code (https://play.golang.org/p/Y1sKPcqqXPH) to remove all useless artifacts, but i don t think it answers op goal anymore. but overall, yeah, feed a chan with your jobs, read the chan, on value found, cancel and quit early. –  May 18 '19 at 08:25
  • there is also more to say, like, if you were to create long running routines, the cancel might never occur because once you found a suitable value you might simply send it out to another chan, or async func call, for more processing without quitting the for loop reading the foo/bar channs. –  May 18 '19 at 08:27
  • @mh-cbon maybe I'm missing something, but your solution doesn't seem to work either. I need multiple channels, because if I don't cancel, then I need to know which value came from which channel, no? You are correct that I don't terminate the for loop, which was an oversight. But the code does return each result as soon as it's available (within the for loop, not from the function). I think my statement of "returning as soon as possible" was not the best way to phrase it. I just mean a non-blocking operation that returns results as they available so that I can eventually cancel out of the loop. – clo_jur May 18 '19 at 15:16
  • @mh-cbon as long as you take care of closing the channels, your `done` flag can just be replaced by setting all the channels to `nil` and terminating the loop. The for loop should only exit when all values are found or one of the channels returns a boolean I don't want. I feel like I'm still missing something though give your responses, so please lmk if I still have some oversights. – clo_jur May 18 '19 at 15:26
  • true, my example was not correct. you can try something like this https://play.golang.org/p/MAhatd8a5IE I think i missed things like the time.Sleep before the context check, and i thought the waitgroup was dispensable. fixed now! –  May 18 '19 at 21:30

2 Answers2

3

I would use a single channel to communicate results, so it's much easier to gather the results and it "scales" automatically by its nature. If you need to identify the source of a result, simply use a wrapper which includes the source. Something like this:

type Result struct {
    ID     string
    Result bool
}

To simulate "real" work, the workers should use a loop doing their work in an iterative manner, and in each iteration they should check the cancellation signal. Something like this:

func foo(ctx context.Context, pretendWorkMs int, resch chan<- Result) {
    log.Printf("foo started...")
    for i := 0; i < pretendWorkMs; i++ {
        time.Sleep(time.Millisecond)
        select {
        case <-ctx.Done():
            log.Printf("foo terminated.")
            return
        default:
        }
    }
    log.Printf("foo finished")
    resch <- Result{ID: "foo", Result: false}
}

In our example the bar() is the same just replace all foo word with bar.

And now executing the jobs and terminating the rest early if one does meet our expectation looks like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resch := make(chan Result, 2)

log.Println("Kicking off workers...")
go foo(ctx, 3000, resch)
go bar(ctx, 5000, resch)

for i := 0; i < cap(resch); i++ {
    result := <-resch
    log.Printf("Result of %s: %v", result.ID, result.Result)
    if !result.Result {
        cancel()
        break
    }
}
log.Println("Done.")

Running this app will output (try it on the Go Playground):

2009/11/10 23:00:00 Kicking off workers...
2009/11/10 23:00:00 bar started...
2009/11/10 23:00:00 foo started...
2009/11/10 23:00:03 foo finished
2009/11/10 23:00:03 Result of foo: false
2009/11/10 23:00:03 Done.

Some things to note. If we terminate early due to unexpected result, the cancel() function will be called, and we break out form the loop. It may be the rest of the workers also complete their work concurrently and send their result, which will not be a problem as we used a buffered channel, so their send will not block and they will end properly. Also, if they don't complete concurrently, they check ctx.Done() in their loop, and they terminate early, so the goroutines are cleaned up nicely.

Also note that the output of the above code does not print bar terminated. This is because the main() function terminates right after the loop, and once the main() function ends, it does not wait for other non-main goroutines to complete. For details, see No output from goroutine in Go. If the app would not terminate immediately, we would see that line printed too. If we add a time.Sleep() at the end of main():

log.Println("Done.")
time.Sleep(3 * time.Millisecond)

Output will be (try it on the Go Playground):

2009/11/10 23:00:00 Kicking off workers...
2009/11/10 23:00:00 bar started...
2009/11/10 23:00:00 foo started...
2009/11/10 23:00:03 foo finished
2009/11/10 23:00:03 Result of foo: false
2009/11/10 23:00:03 Done.
2009/11/10 23:00:03 bar terminated.

Now if you must wait for all workers to end either "normally" or "early" before moving on, you can achieve that in many ways.

One way is to use a sync.WaitGroup. For an example, see Prevent the main() function from terminating before goroutines finish in Golang. Another way would be to have each worker send a Result no matter how they end, and Result could contain the termination condition, e.g. normal or aborted. And the main() goroutine could continue the receive loop until it receives n values from resch. If this solution is chosen, you must ensure each worker sends a value (even if a panic occurs) to not block the main() in such cases (e.g. with using defer).

icza
  • 389,944
  • 63
  • 907
  • 827
0

I'm going to share the most simplistic pattern for what you're talking about. You can extend it for more complicated scenarios.

func doStuff() {
    // This can be a chan of anything.
    msgCh := make(chan string)

    // This is how you tell your go-routine(s) to stop, by closing this chan.
    quitCh := make(chan struct{})
    defer close(quitCh)

    // Start all go routines.
    for whileStart() {
        go func() {
            // Do w/e you need inside of your go-routine.

            // Write back the result.
            select {
            case msgCh <- "my message":
                // If we got here then the chan is open.
            case <-quitCh:
                // If we got here then the quit chan was closed.
            }
        }()
    }

    // Wait for all go routines.
    for whileWait() {
        // Block until a msg comes back.
        msg := <-msgCh
        // If you found what you want.
        if msg == stopMe {
            // It's safe to return because of the defer earlier.
            return
        }
    }
}
Max Kerper
  • 52
  • 2
  • 1
    Unfortunately, I do not have a high enough reputation to comment on the accepted solution above, but it has a memory leak @icza. You must write back the result at the same time as you check the quit channel using a select block. If you first check the quit channel and afterwards try writing to the result channel, you risk the result channel being closed in between those operations, which would block your go routine forever and cause a memory leak. – Max Kerper Aug 16 '19 at 21:42