8

I have written an API that makes DB calls and does some business logic. I am invoking a goroutine that must perform some operation in the background. Since the API call should not wait for this background task to finish, I am returning 200 OK immediately after calling the goroutine (let us assume the background task will never give any error.)

I read that goroutine will be terminated once the goroutine has completed its task. Is this fire and forget way safe from a goroutine leak? Are goroutines terminated and cleaned up once they perform the job?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    go func() {
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
}
tecs-x
  • 308
  • 3
  • 12
Anish
  • 121
  • 1
  • 8
  • 1
    "Is this fire and forget way safe from goroutine leak?" -- It is if you know your goroutine will exit (or you don't want it to). A "leak" would only happen if you have a goroutine that gets stuck in an unexpected infinite loop, for example. – Jonathan Hall Jul 13 '21 at 12:21

4 Answers4

5

There is no "goroutine cleaning" you have to handle, you just launch goroutines and they'll be cleaned when the function launched as a goroutine returns. Quoting from Spec: Go statements:

When the function terminates, its goroutine also terminates. If the function has any return values, they are discarded when the function completes.

So what you do is fine. Note however that your launched goroutine cannot use or assume anything about the request (r) and response writer (w), you may only use them before you return from the handler.

Also note that you don't have to write http.StatusOK, if you return from the handler without writing anything, that's assumed to be a success and HTTP 200 OK will be sent back automatically.

See related / possible duplicate: Webhook process run on another goroutine

icza
  • 389,944
  • 63
  • 907
  • 827
  • When you say "-- you just launch goroutines and they'll be cleaned when the function launched as a goroutine returns." Does your return means finished execution ? Asking because my function does not return anything its just performing some task. – Anish Jul 14 '21 at 18:43
  • @Anish It doesn't have to return anything. Quoting from [Spec: Go statements:](https://golang.org/ref/spec#Go_statements) _"When the function terminates, its goroutine also terminates. If the function has any return values, they are discarded when the function completes."_ – icza Jul 14 '21 at 19:07
  • Okay , but may be I am missing something , coz like I mentioned, mine is a a method that does not return anything. and load testing my API , CPU usage goes up by 20% for every 10rps increase, (rps, cpu) -> (10,23), (20,46), ... and crashes. – Anish Jul 15 '21 at 10:10
  • 1
    @Anish If the goroutine has a lengthy task, calling continuously your handler you'll end up with a ton of running goroutines. That obviously requires memory and CPU (managing, scheduling and executing the goroutines, cleaning up stacks etc). – icza Jul 15 '21 at 10:42
5

I would recommend always having your goroutines under control to avoid memory and system exhaustion. If you are receiving a spike of requests and you start spawning goroutines without control, probably the system will go down soon or later.

In those cases where you need to return an immediate 200Ok the best approach is to create a message queue, so the server only needs to create a job in the queue and return the ok and forget. The rest will be handled by a consumer asynchronously.

Producer (HTTP server) >>> Queue >>> Consumer

Normally, the queue is an external resource (RabbitMQ, AWS SQS...) but for teaching purposes, you can achieve the same effect using a channel as a message queue.

In the example you'll see how we create a channel to communicate 2 processes. Then we start the worker process that will read from the channel and later the server with a handler that will write to the channel.

Try to play with the buffer size and job time while sending curl requests.

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

    queueSize := 10
    // This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
    myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'

    // Starts a worker that will read continuously from our queue
    go myBackgroundWorker(myJobQueue)

    // We start our server with a handler that is receiving the queue to write to it
    if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
        panic(err)
    }
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // We check that in the query string we have a 'user_id' query param
        if userID := r.URL.Query().Get("user_id"); userID != "" {
            select {
            case myJobQueue <- userID: // We try to put the item into the queue ...
                rw.WriteHeader(http.StatusOK)
                rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
            default: // If we cannot write to the queue it's because is full!
                rw.WriteHeader(http.StatusInternalServerError)
                rw.Write([]byte(`our internal queue is full, try it later`))
            }
            return
        }
        rw.WriteHeader(http.StatusBadRequest)
        rw.Write([]byte(`missing 'user_id' in query params`))
    }
}

func myBackgroundWorker(myJobQueue <-chan string) {
    const (
        jobDuration = 10 * time.Second // simulation of a heavy background process
    )

    // We continuosly read from our queue and process the queue 1 by 1.
    // In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
    for userID := range myJobQueue {
        // rate limiter here ...
        // go func(u string){
        log.Printf("processing user: %s, started", userID)
        time.Sleep(jobDuration)
        log.Printf("processing user: %s, finisehd", userID)
        // }(userID)
    }
}

wakumaku
  • 616
  • 5
  • 5
  • But the sends and receives are blocked until the other side is ready. So I think its not going to be an exact fire and forget. – Anish Jul 14 '21 at 18:35
  • 1
    @Anish as long as there is a default case it wont block. the example is really good. –  Jul 14 '21 at 22:07
  • Sorry for asking to many follow up questions , but when a default case is executed , it means our background task case for that instance is lost , coz we choose to go far default case , since my channel is not ready to receive. Am I correct ? – Anish Jul 15 '21 at 10:05
  • 1
    the idea is to protect your system by managing the backpressure on the channel and if the channel is full, the server returns a 500. Then the client should implement a retry policy, so no messages are lost. Also, this example is bad for production environments, because if the server crashes, all messages in the queue will be lost, same if you spawn a lot of groutines and there is a crash ... My recommendation would be 2 separate processes: a server and a worker. The server receives messages and puts them in a queue and return. Worker read messages from the queue at his own pace to process them – wakumaku Jul 15 '21 at 10:32
2

@icza is absolutely right there is no "goroutine cleaning" you can use a webhook or a background job like gocraft. The only way I can think of using your solution is to use the sync package for learning purposes.

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
// Some DB calls
// Some business logics
var wg sync.WaitGroup
wg.Add(1)
go func() {
  defer wg.Done()
    // some Task taking 5 sec
}()
w.WriteHeader(http.StatusOK)
wg.wait()

}

1

you can wait for a goroutine to finish using &sync.WaitGroup:

// BusyTask
func BusyTask(t interface{}) error {
    var wg = &sync.WaitGroup{}

    wg.Add(1)
    go func() {
        // busy doing stuff
        time.Sleep(5 * time.Second)
        wg.Done()
    }()
    wg.Wait() // wait for goroutine

    return nil
}

// this will wait 5 second till goroutune finish
func main() {
    fmt.Println("hello")

    BusyTask("some task...")

    fmt.Println("done")
}

Other way is to attach a context.Context to goroutine and time it out.

//
func BusyTaskContext(ctx context.Context, t string) error {
    done := make(chan struct{}, 1)
    //
    go func() {
        // time sleep 5 second
        time.Sleep(5 * time.Second)
        // do tasks and signle done
        done <- struct{}{}
        close(done)
    }()
    //
    select {
    case <-ctx.Done():
        return errors.New("timeout")
    case <-done:
        return nil
    }
}

//
func main() {
    fmt.Println("hello")

    ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
    defer cancel()

    if err := BusyTaskContext(ctx, "some task..."); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("done")
}
twiny
  • 284
  • 5
  • 11
  • I believe the point is that they wanted to "fire and forget" and specifically **not** wait for the goroutine to finish, but were worried that might cause goroutine leaks. – Dean Jul 13 '21 at 17:54
  • that is correct! setting a context is better than waiting for goroutine to finish. – twiny Jul 13 '21 at 19:30