I am working on a recommendation engine with Apache Prediction IO. Before the event server i have an GO api that listens events from customer and importer. In a particular case when customer uses importer i collect the imported identitys and i send in a json from importer api to GO api. As an example if user imports a csv that contains 45000 data, i send those 45000 identity to GO api in a json like {"barcodes":[...]}
. Prediction IO event server wants data in a particular shape.
type ItemEvent struct {
Event string `json:"event"`
EntityType string `json:"entityType"`
EntityId string `json:"entityId"`
Properties map[string][]string `json:"properties"`
EventTime time.Time `json:"eventTime"`
}
type ItemBulkEvent struct {
Event string `json:"event"`
Barcodes []string `json:"barcodes"`
EventTime time.Time `json:"eventTime"`
}
ItemEvent
is the final data that i will send to event server from GO Api. ItemBulkEvent
is the data that i receive from importer api.
func HandleItemBulkEvent(w http.ResponseWriter, r *http.Request) {
var itemBulk model.ItemBulkEvent
err := decode(r,&itemBulk)
if err != nil {
log.Fatalln("handleitembulkevent -> ",err)
util.RespondWithError(w,400,err.Error())
}else {
var item model.ItemEvent
item.EventTime = itemBulk.EventTime; item.EntityType = "item"; item.Event = itemBulk.Event
itemList := make([]model.ItemEvent,0,50)
for index, barcode := range itemBulk.Barcodes{
item.EntityId = barcode
if (index > 0 && (index % 49) == 0){
itemList = append(itemList, item)
go sendBulkItemToEventServer(w,r,itemList)
itemList = itemList[:0]
}else if index == len(itemBulk.Barcodes) - 1{
itemList = append(itemList, item)
itemList = itemList[:( (len(itemBulk.Barcodes) - 1) % 49)]
go sendBulkItemToEventServer(w,r,itemList) // line 116
itemList = itemList[:0]
} else{
itemList = append(itemList, item)
}
}
util.RespondWithJSON(w,200,"OK")
}
}
HandleItemBulkEvent
is a handler function for bulk updates. In this step i should mention about prediction io's batch uploads. Via rest api prediction io event server takes 50 event per request. So i created a list with 50 cap and an item. I used same item and just changed identity part(barcode) in every turn and added to list. In every 50. item i used a handler function that sends that list to event server and after that cleaned the list so on.
func sendBulkItemToEventServer(w http.ResponseWriter, r *http.Request, itemList []model.ItemEvent){
jsonedItem,err := json.Marshal(itemList)
if err != nil{
log.Fatalln("err marshalling -> ",err.Error())
}
// todo: change url to event server url
resp, err2 := http.Post(fmt.Sprintf("http://localhost:7070/batch/events.json?accessKey=%s",
r.Header.Get("Authorization")),
"application/json",
bytes.NewBuffer(jsonedItem))
if err2 != nil{
log.Fatalln("err http -> " , err.Error()) // line 141
}
defer resp.Body.Close()
}
sendBulkItemToEventServer
function marshals the incoming itemlist and makes an post request to prediction io's event server. In this part when i try with 5000+- item it does well but when i try with 45000 item application crashes with below error.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xc05938]
goroutine 620 [running]:
api-test/service.sendBulkItemToEventServer(0x1187860, 0xc00028e0e0, 0xc00029c200, 0xc00011c000, 0x31, 0x32)
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:141 +0x468
created by api-test/service.HandleItemBulkEvent
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:116 +0x681
Debugger finished with exit code 0
Any idea how can i solve this problem?
edit: as Burak Serdar mentioned in the answers, i fixed the err, err2 confusion and the data race problem by using marshalling before send. Now it gives me the real error(res,err2) i guess.
2020/08/03 15:11:55 err http -> Post "http://localhost:7070/batch/events.json?accessKey=FJbGODbGzxD-CoLTdwTN2vwsuEEBJEZc4efrSUc6ekV1qUYAWPu5anDTyMGDoNq1": read tcp 127.0.0.1:54476->127.0.0.1:7070: read: connection reset by peer
Any idea on this?
There are several errors in your program. The runtime error is because you are checking if err2
is not nil, but then you're printing err
, not err2
. err
is nil, thus the runtime error.
This means err2
is not nil, so you should see what that error is.
You mentioned you are sending messages in batches of 50, but that implementation is wrong. You add elements to the itemList
, then start a goroutine with that itemList
, then truncate it and start filling again. That is a data race, and your goroutines will see the itemList
instances that are being modified by the handler. Instead of truncating, simply create a new itemList
when you submit one to the goroutine, so each goroutine can have their own copy.
If you want to keep using the same slice, you can marshal the slice, and then pass the JSON message to the goroutine instead of the slice.