2

I have a channel that receives slices of maps with this code:

func submitRecords(records []map[string]string) {
    batch := []map[string]string{}
    ch := make(chan []map[string]string)
    batchCt := 1
    go func() {
        for _, v := range records {
            batch = append(batch, v)
            if len(batch) == 150 {
                ch <- batch
                batch = nil
            }
        }
        close(ch)
    }()
}

The API that I am submitting these records to accepts batches of up to 150. In order to speed this up I would like to spin up 4 go routines to concurrently process the records in the channel. Once the records hit the channel, it doesnt matter what order they get processed.

Currently I made the following update to the above code that runs singularly to process it:

func submitRecords(records []map[string]string) {
    batch := []map[string]string{}
    ch := make(chan []map[string]string)
    batchCt := 1
    go func() {
        for _, v := range records {
            batch = append(batch, v)
            if len(batch) == 150 {
                ch <- batch
                batch = nil
            }
        }
        close(ch)
    }()

    for b := range ch {
        str, _ := json.Marshal(b)
        fmt.Printf("Sending batch at line %d\n", (batchCt * 150))

        payload := strings.NewReader(string(str))
        client := &http.Client{}
        req, err := http.NewRequest(method, url, payload)
        if err != nil {
            fmt.Println(err)
        }

        login, _ := os.LookupEnv("Login")
        password, _ := os.LookupEnv("Password")
        req.Header.Add("user_name", login)
        req.Header.Add("password", password)
        req.Header.Add("Content-Type", "application/json")

        res, err := client.Do(req)
        if err != nil {
            fmt.Println(err)
        }
        batchCt++
    }
}

How would I modify this to have 4 go routines pull from the channel and send these requests? Or, is this even possible/have I misunderstood the capabilities of go routines?

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
DBA108642
  • 1,995
  • 1
  • 18
  • 55
  • `batch = nil` this line is not going to work. –  Jul 06 '20 at 19:15
  • Why? Based on my research I have found that this is the way to clear a slice for reuse similar to a `batch.clear()` operation in python. And when I check the records after sending them to the API, it all looks like it worked correctly. – DBA108642 Jul 06 '20 at 19:18
  • your async loop is missing a trailing `ch <- batch`, i guess. –  Jul 06 '20 at 19:18
  • your function is written upside down. The current loop being async should be synced, there should be N routines of requests processing. the function should terminate by waiting on those N goroutines to finish. –  Jul 06 '20 at 19:20
  • Basically, I add records to the `batch` slice until the slice has 150 records. once it hits 150, I send it to the channel and clear the slice. I am relatively new to concurrency in Go so it is possible I mixed something up, it just seems like its working – DBA108642 Jul 06 '20 at 19:20
  • I see. Could you help point me in that right direction with some psuedocode or something? – DBA108642 Jul 06 '20 at 19:21
  • ** seems like its working ** yes. It might appears to be working under some circumstances, i would check more thoroughly. –  Jul 06 '20 at 19:21
  • you need not to create a new instance of http client at each iteration, in `client := &http.Client{}` –  Jul 06 '20 at 19:22
  • Thank you! I will pull that out. If you have other suggestions I would happily upvote and accept an answer – DBA108642 Jul 06 '20 at 19:24
  • sketched what matters https://play.golang.org/p/K51WoMx1fiX hth –  Jul 06 '20 at 19:31
  • I implemented this and it seems to be running, but my file of 100k records is only showing that it has sent 60-70k depending on the run. I will tinker a little more and see what happens, but thank you for your help, it's been very valuable – DBA108642 Jul 06 '20 at 19:53
  • verify you are not running into a racy program with the `-race` flag of the go build/run command. –  Jul 06 '20 at 19:55
  • doing that now, will let you know the result – DBA108642 Jul 06 '20 at 20:00
  • Getting the error `Post {API URL}: dial tcp: lookup {URL}: no such host` `panic: runtime error: invalid memory address or nil pointer dereference` `[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x15bc28d` So I think perhaps the API is limiting the rate of requests causing the requests to drop – DBA108642 Jul 06 '20 at 20:08
  • regarding the networking part, i cant really tell. However, that panic is easy to catch, in the loop, after `fmt.Println(err)` add a `continue` statement. Although, your api might deserve a little more abstractions in order to write some real tests. Doing cli test, as I suspect you do, is unreliable over time, and prone to error in the short terms (my humble opinion) –  Jul 06 '20 at 20:20
  • btw, in `res, err := client.Do(req)` res is unused and i believe that should read and close the response body to benefit of connection re-use. https://stackoverflow.com/a/17948937/4466350 –  Jul 06 '20 at 20:28
  • made those updates and saw great performance boosts. but some how im still not getting every record sent. when I run `go run -race main.go` I dont have any race conditions but somehow not everything is getting sent. From what I can tell there are no errors on the server side either. any ideas? – DBA108642 Jul 07 '20 at 14:41
  • maybe you want just t count the number of outgoing requests and the count of errorred requests. You might just want to use expvar package to increment an integer and get access to those results via the http handler, or something fancier like expvarmon. this should give you more info https://matoski.com/article/golang-expvar-metrics/ –  Jul 07 '20 at 14:49
  • Is it possible that go routines access the same batch from the channel and records are getting dropped that way? – DBA108642 Jul 07 '20 at 14:54
  • unclear, i did not understand. Aint you supposed to trigger 100K/150 requests ? –  Jul 07 '20 at 14:59
  • yes, I have 100k records that I am sending 150 at a time. However using this concurrent approach it looks like only about 96k actually successfully POST to the API. However the server is telling me that there are no errors, and Go is telling me there are no data races. The more Go routines I add, the fewer records successfully post. Im just lost as to what is going on – DBA108642 Jul 07 '20 at 15:02
  • For example, if I use 10 go routines, 63k records are sent successfully – DBA108642 Jul 07 '20 at 15:07
  • are you checking for response's codes ? Are you logging events via expvar (or anything else) ? Have you considered writing some tests ? if you dont have a clue, that means you are lacking data to analyze the issue you are facing. but tests are better. That being said, i guess your code has significantly changed since your original post and there might be something wrong. –  Jul 07 '20 at 15:38
  • yeah, I want to thank you so much. Your help has been invaluable and I appreciate it so much. By this point I think it may be appropriate to post a new question if I cant figure it out logging events and response codes – DBA108642 Jul 07 '20 at 15:51
  • correct! see you there maybe –  Jul 07 '20 at 16:17

1 Answers1

0
func process(ch chan []map[string]string) {
    for b := range ch {
        str, _ := json.Marshal(b)

        // This wont work, or has to be included in the payload from the channel
        // fmt.Printf("Sending batch at line %d\n", (batchCt * 150))

        payload := strings.NewReader(string(str))
        client := &http.Client{}
        req, err := http.NewRequest(method, url, payload)
        if err != nil {
            fmt.Println(err)
        }

        login, _ := os.LookupEnv("Login")
        password, _ := os.LookupEnv("Password")
        req.Header.Add("user_name", login)
        req.Header.Add("password", password)
        req.Header.Add("Content-Type", "application/json")

        res, err := client.Do(req)
        if err != nil {
            fmt.Println(err)
        }
        // batchCt++
    }
    done <- 1
}

func submitRecords(records []map[string]string) {
    batch := []map[string]string{}
    ch := make(chan []map[string]string)
    
    go process(ch)
    go process(ch)
    go process(ch)
    go process(ch)

    // batchCt := 1
    for _, v := range records {
        batch = append(batch, v)
        if len(batch) == 150 {
            ch <- batch
            batch = []map[string]string{}
        }
    }
    // Send the last not to size batch
    ch <- batch
    close(ch)
}

Playground example that uses int's and sleep https://play.golang.org/p/q5bUhXt9aUn

package main

import (
    "fmt"
    "time"
)

func process(processor int, ch chan []int, done chan int) {
    for batch := range ch {
        // Do something.. sleep or http requests will let the other workers work as well
        time.Sleep(time.Duration(len(batch)) * time.Millisecond)
        fmt.Println(processor, batch)
    }
    done <- 1
}

const batchSize = 3

func main() {
    records := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
    ch := make(chan []int)
    done := make(chan int)

    go process(1, ch, done)
    go process(2, ch, done)
    go process(3, ch, done)
    go process(4, ch, done)

    batch := make([]int, 0, batchSize)
    for _, v := range records {
        batch = append(batch, v)
        if len(batch) == batchSize {
            ch <- batch
            batch = make([]int, 0, batchSize)
        }
    }
    ch <- batch
    close(ch)

    <-done
    <-done
    <-done
    <-done
}
Kent
  • 568
  • 4
  • 12
  • 1
    isnt it preferable in your last example to use a WaitGroup rather than a channel ? –  Jul 07 '20 at 10:37
  • @Kent I employed this approach but not every record is getting sent. out of 100k about 95k are sent. No data races or server side errors. any ideas? – DBA108642 Jul 07 '20 at 14:54
  • @DBA108642 guessing that either the program exists before having time to send all the messages, i.e. why I use the done channel, but as mh-cbon mentions waitgroups should probably be used instead as a much cleaner solution. or the last batch doesn't get sent. Because its not a full 150 (but then you should see almost all the records except a sub 150) – Kent Jul 08 '20 at 07:13