I am working on a recommendation engine with Apache Prediction IO. Before the event server i have an GO api that listens events from customer and importer. In a particular case when customer uses importer i collect the imported identitys and i send in a json from importer api to GO api. As an example if user imports a csv that contains 45000 data, i send those 45000 identity to GO api in a json like {"barcodes":[...]}
. Prediction IO event server wants data in a particular shape.
type ItemEvent struct {
Event string `json:"event"`
EntityType string `json:"entityType"`
EntityId string `json:"entityId"`
Properties map[string][]string `json:"properties"`
EventTime time.Time `json:"eventTime"`
}
type ItemBulkEvent struct {
Event string `json:"event"`
Barcodes []string `json:"barcodes"`
EventTime time.Time `json:"eventTime"`
}
ItemEvent
is the final data that i will send to event server from GO Api. ItemBulkEvent
is the data that i receive from importer api.
func HandleItemBulkEvent(w http.ResponseWriter, r *http.Request) {
var itemBulk model.ItemBulkEvent
err := decode(r,&itemBulk)
if err != nil {
log.Fatalln("handleitembulkevent -> ",err)
util.RespondWithError(w,400,err.Error())
}else {
var item model.ItemEvent
item.EventTime = itemBulk.EventTime; item.EntityType = "item"; item.Event = itemBulk.Event
itemList := make([]model.ItemEvent,0,50)
for index, barcode := range itemBulk.Barcodes{
item.EntityId = barcode
if (index > 0 && (index % 49) == 0){
itemList = append(itemList, item)
go sendBulkItemToEventServer(w,r,itemList)
itemList = itemList[:0]
}else if index == len(itemBulk.Barcodes) - 1{
itemList = append(itemList, item)
itemList = itemList[:( (len(itemBulk.Barcodes) - 1) % 49)]
go sendBulkItemToEventServer(w,r,itemList) // line 116
itemList = itemList[:0]
} else{
itemList = append(itemList, item)
}
}
util.RespondWithJSON(w,200,"OK")
}
}
HandleItemBulkEvent
is a handler function for bulk updates. In this step i should mention about prediction io's batch uploads. Via rest api prediction io event server takes 50 event per request. So i created a list with 50 cap and an item. I used same item and just changed identity part(barcode) in every turn and added to list. In every 50. item i used a handler function that sends that list to event server and after that cleaned the list so on.
func sendBulkItemToEventServer(w http.ResponseWriter, r *http.Request, itemList []model.ItemEvent){
jsonedItem,err := json.Marshal(itemList)
if err != nil{
log.Fatalln("err marshalling -> ",err.Error())
}
// todo: change url to event server url
resp, err2 := http.Post(fmt.Sprintf("http://localhost:7070/batch/events.json?accessKey=%s",
r.Header.Get("Authorization")),
"application/json",
bytes.NewBuffer(jsonedItem))
if err2 != nil{
log.Fatalln("err http -> " , err.Error()) // line 141
}
defer resp.Body.Close()
}
sendBulkItemToEventServer
function marshals the incoming itemlist and makes an post request to prediction io's event server. In this part when i try with 5000+- item it does well but when i try with 45000 item application crashes with below error.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xc05938]
goroutine 620 [running]:
api-test/service.sendBulkItemToEventServer(0x1187860, 0xc00028e0e0, 0xc00029c200, 0xc00011c000, 0x31, 0x32)
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:141 +0x468
created by api-test/service.HandleItemBulkEvent
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:116 +0x681
Debugger finished with exit code 0
Any idea how can i solve this problem?
edit: as Burak Serdar mentioned in the answers, i fixed the err, err2 confusion and the data race problem by using marshalling before send. Now it gives me the real error(res,err2) i guess.
2020/08/03 15:11:55 err http -> Post "http://localhost:7070/batch/events.json?accessKey=FJbGODbGzxD-CoLTdwTN2vwsuEEBJEZc4efrSUc6ekV1qUYAWPu5anDTyMGDoNq1": read tcp 127.0.0.1:54476->127.0.0.1:7070: read: connection reset by peer
Any idea on this?