We are writing a proxy function that does async request to a service, which process some data. The data processing could take 0-10 seconds. After the requests get sent, depending on how long the service takes to process the data, we do the following:
If request gets a response back from service within 2 seconds: we forwards the response to whoever uses the proxy. If request does not get a response back from service in 2 seconds: our proxy returns 202. Any further responses from our service are ignored.
Each request has its own threads. Below is our code:
func httpHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
// Forward-To is the host to forward the request to
forwardTo := strings.TrimSpace(r.Header.Get("Forward-To"))
// Is there no Forward-To header?
if forwardTo == "" {
// If so, return metrics.
writeProxyMetrics(w, http.StatusOK, kafkaMsgId)
return
}
// Delete the Forward-To header for when we copy the request to proxy it
r.Header.Del("Forward-To")
// Read the body to copy it
body, err := ioutil.ReadAll(r.Body)
if err != nil {
atomic.AddInt64(&state.ActiveRequests, -1)
writeProxyMetrics(w, http.StatusInternalServerError, kafkaMsgId)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Create the proxy request
proxyRequest, err := http.NewRequest(r.Method, forwardTo, bytes.NewReader(body))
if err != nil {
atomic.AddInt64(&state.ActiveRequests, -1)
writeProxyMetrics(w, http.StatusInternalServerError, kafkaMsgId)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Copy the headers
proxyRequest.Header = r.Header
// Do the actual request
asyncRequests(w, proxyRequest)
}
func asyncRequests(w http.ResponseWriter, proxyRequest *http.Request) {
timeoutChan := make(chan bool, 2)
var requestResponse *http.Response
var requestResponseBody []byte
var requestError error
go func() {
// Do the request
requestResponse, requestError = httpClient.Do(proxyRequest)
// Was there no error?
if requestError == nil {
defer requestResponse.Body.Close()
// Read the body
if body, err := ioutil.ReadAll(requestResponse.Body); err == nil {
requestResponseBody = body
} else {
fmt.Println("failed to read response body.")
} else {
fmt.Printf("[!] Request failed %v", requestError)
}
// We did not timeout, request finished
timeoutChan <- false
}()
go func() {
// Sleep for the timeout then notify the timeout channel
time.Sleep(time.Duration(config.ProxyTimeout) * time.Millisecond)
timeoutChan <- true
}()
if <-timeoutChan {
//return 202
w.WriteHeader(http.StatusAccepted)
} else {
//didn't time out and we got something back within 2 seconds
if requestError == nil {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}
}
Currently, we are experiencing a problem that if we send 2000 requests over 10 seconds, we are getting a lot of POST "URL" failed EOF errors. However, our service still actually gets the request. I've read a lot about this EOF error and it seems that it's related to subsequent requests trying to re-use the connections but connections dropped. I've read issues here and here
I've stuck on this for almost a week and running out of ideas on how to further debug this. Could anyone please help take a look? Our go version is 1.13 and I also tried 1.14 and we are still experiencing this.