Following up on old post here.
I am iterating over flatProduct.Catalogs
slice and populating my productCatalog
concurrent map in golang. I am using upsert method so that I can add only unique productID's
into my productCatalog
map.
Below code is called by multiple go routines
in parallel that is why I am using concurrent map here to populate data into it. This code runs in background to populate data in the concurrent map every 30 seconds.
var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
return err
}
for i := range productRows {
flatProduct, err := r.Convert(spn, productRows[i])
if err != nil {
return err
}
if flatProduct.StatusCode == definitions.DONE {
continue
}
r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
for _, catalogId := range flatProduct.Catalogs {
catalogValue := strconv.FormatInt(int64(catalogId), 10)
r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
productID := newValue.(int64)
if valueInMap == nil {
return map[int64]struct{}{productID: {}}
}
oldIDs := valueInMap.(map[int64]struct{})
// value is irrelevant, no need to check if key exists
// I think problem is here
oldIDs[productID] = struct{}{}
return oldIDs
})
}
}
And below are my getters in the same class where above code is there. These getters are used by main application threads to get data from the map or get the whole map.
func (r *clientRepository) GetProductMap() *cmap.ConcurrentMap {
return r.products
}
func (r *clientRepository) GetProductCatalogMap() *cmap.ConcurrentMap {
return r.productCatalog
}
func (r *clientRepository) GetProductData(pid string) *definitions.FlatProduct {
pd, ok := r.products.Get(pid)
if ok {
return pd.(*definitions.FlatProduct)
}
return nil
}
This is how I am reading data from this productCatalog
cmap but my system is crashing on the below range statement -
// get productCatalog map which was populated above
catalogProductMap := clientRepo.GetProductCatalogMap()
productIds, ok := catalogProductMap.Get("211")
data, _ := productIds.(map[int64]struct{})
// I get panic here after sometime
for _, pid := range data {
...
}
Error I am getting as - fatal error: concurrent map iteration and map write
.
I think issue is r.productCatalog
is a concurrentmap, but oldIDs[productID]
is a normal map which is causing issues while I am iterating in the for loop above.
How can I fix this race issue I am seeing? One way I can think of is making oldIDs[productID]
as concurrent map but if I do that approach then my memory increase by a lot and eventually goes OOM. Below is what I have tried which works and it solves the race condition but it increases the memory by a lot which is not what I want -
r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
productID := newValue.(int64)
if valueInMap == nil {
// return map[int64]struct{}{productID: {}}
return cmap.New()
}
// oldIDs := valueInMap.(map[int64]struct{})
oldIDs := valueInMap.(cmap.ConcurrentMap)
// value is irrelevant, no need to check if key exists
// oldIDs[productID] = struct{}{}
oldIDs.Set(strconv.FormatInt(productID, 10), struct{}{})
return oldIDs
})
Any other approach I can do which doesn't increase memory and also fixes the race condition I am seeing?
Note I am still using v1 version of cmap without generics and it deals with strings as keys.