3

Following up on old post here.

I am iterating over flatProduct.Catalogs slice and populating my productCatalog concurrent map in golang. I am using upsert method so that I can add only unique productID's into my productCatalog map.

Below code is called by multiple go routines in parallel that is why I am using concurrent map here to populate data into it. This code runs in background to populate data in the concurrent map every 30 seconds.

var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
    return err
}
for i := range productRows {
    flatProduct, err := r.Convert(spn, productRows[i])
    if err != nil {
        return err
    }
    if flatProduct.StatusCode == definitions.DONE {
        continue
    }
    r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
    for _, catalogId := range flatProduct.Catalogs {
        catalogValue := strconv.FormatInt(int64(catalogId), 10)
        r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
            productID := newValue.(int64)
            if valueInMap == nil {
                return map[int64]struct{}{productID: {}}
            }
            oldIDs := valueInMap.(map[int64]struct{})
            
            // value is irrelevant, no need to check if key exists 
            // I think problem is here
            oldIDs[productID] = struct{}{}
            return oldIDs
        })
    }
}

And below are my getters in the same class where above code is there. These getters are used by main application threads to get data from the map or get the whole map.

func (r *clientRepository) GetProductMap() *cmap.ConcurrentMap {
    return r.products
}

func (r *clientRepository) GetProductCatalogMap() *cmap.ConcurrentMap {
    return r.productCatalog
}

func (r *clientRepository) GetProductData(pid string) *definitions.FlatProduct {
    pd, ok := r.products.Get(pid)
    if ok {
        return pd.(*definitions.FlatProduct)
    }
    return nil
}

This is how I am reading data from this productCatalog cmap but my system is crashing on the below range statement -

// get productCatalog map which was populated above
catalogProductMap := clientRepo.GetProductCatalogMap()
productIds, ok := catalogProductMap.Get("211")
data, _ := productIds.(map[int64]struct{})

// I get panic here after sometime
for _, pid := range data {
  ...
}

Error I am getting as - fatal error: concurrent map iteration and map write.

I think issue is r.productCatalog is a concurrentmap, but oldIDs[productID] is a normal map which is causing issues while I am iterating in the for loop above.

How can I fix this race issue I am seeing? One way I can think of is making oldIDs[productID] as concurrent map but if I do that approach then my memory increase by a lot and eventually goes OOM. Below is what I have tried which works and it solves the race condition but it increases the memory by a lot which is not what I want -

r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
    productID := newValue.(int64)
    if valueInMap == nil {
        // return map[int64]struct{}{productID: {}}
        return cmap.New()
    }
    // oldIDs := valueInMap.(map[int64]struct{})
    oldIDs := valueInMap.(cmap.ConcurrentMap)

    // value is irrelevant, no need to check if key exists
    // oldIDs[productID] = struct{}{}
    oldIDs.Set(strconv.FormatInt(productID, 10), struct{}{})
    return oldIDs
})

Any other approach I can do which doesn't increase memory and also fixes the race condition I am seeing?

Note I am still using v1 version of cmap without generics and it deals with strings as keys.

rosed
  • 137
  • 8
  • 4
    If you are learning how to write concurrent programs, don't use libraries like `concurrent-map`. If you don't learn to be comfortable with concepts like mutex yourself, then you won't know if you're using the library correctly. Once you do learn the concepts for yourself, you'll have no need for such libraries. – Hymns For Disco Jan 05 '23 at 23:37
  • Issue isn't with `concurrent-map` library but the issue is with `oldIDs[productID]` which is a normal map. As of now `r.productCatalog` is type `cmap.ConcurrentMap` where the values are type `map[int64]struct{}` but if we could update `r.productCatalog` to be a `cmap.ConcurrentMap` where the values are type `cmap.ConcurrentMap` then it should work. So I think if I make it as a cmap then it would work fine too but I am not sure how would I make that as a cmap. @HymnsForDisco – rosed Jan 05 '23 at 23:47
  • 1
    I understand that, but my advice was more general, which is that using such libraries is going to help you in neither learning nor productivity. You can replace every map in your program with cmap, and it may stop it from crashing, but that doesn't mean your program does what you want it to do. – Hymns For Disco Jan 06 '23 at 00:14
  • Understood and it made sense to me. I will read up more on this but for now I was trying to figure out on how to make this program to stop crashing. In my above use case, it made sense to use concurrent map as I was writing and reading from multiple go routines in parallel but I got a race condition with `oldIDs[productID]` map which is a traditional map. If you can help me figure out on how to make that as a cmap as well then it will be of great help. @HymnsForDisco – rosed Jan 06 '23 at 00:18
  • 1
    Unfortunately it is hard to give specific advice based on the code you have posted right now. We don't know what `r` or `clientRepo` or `GetProductCatalogMap` are, or how the first code fragment is related to the second one. – Hymns For Disco Jan 06 '23 at 00:24
  • First code is just running everything in a background thread which is populating data to my two concurrent maps as already shown. You can just ignore `r`, `clientRepo` and all. But `GetProductCatalogMap` is just a getter which gets the concurrent map. And second code is run by main application threads which uses the data populated in the cmap. I also updated the question little bit. – rosed Jan 06 '23 at 00:27
  • Did you try to use the built-in race detector to identify where the reads and writes happen in the code? See: https://go.dev/doc/articles/race_detector – dev.bmax Jan 12 '23 at 08:09
  • Yes I already tried it and it is pointing to `oldIDs[productID]` map which is causing issues. – rosed Jan 12 '23 at 17:40
  • 1
    @rosed not sure if you've seen the followup message in our last chat, but the solution I suggested in the chat was mistaken; it was using the single mutex for all maps and that would, as you've feared, unreasonably slow down the program. The proper solution is to use one lock per each map, to be able to do that you can declare a new type, one that holds both the map and its associated lock, something like the following: https://go.dev/play/p/k6orzds0AfC – mkopriva Jan 13 '23 at 03:20
  • 1
    @mkopriva Thank you. I will test this out today and will keep you posted. – rosed Jan 13 '23 at 20:04
  • 1
    In this kind of a pattern, you are likely to have higher level races, not just memory races. You should implement a scheme involving critical sections, and access the underlying data structures in a critical section only, for both read and write access. A concurrent map will only protect access to that map, but you are performing multiple operations to the underlying structures. – Burak Serdar Jan 17 '23 at 17:21
  • If you don't want to add additional locks (as previously suggested by others), and assuming the inner maps inside the productCatalog are relatively small, you can avoid the race condition by cloning the oldIDs map before inserting a new value and then returning the clone. – yairhoff Jan 17 '23 at 19:53

1 Answers1

0

Rather than a plain map[int64]struct{} type, you could define a struct which holds the map and a mutex to control the access to the map:

type myMap struct{
   m sync.Mutex
   data map[int64]struct{}
}

func (m *myMap) Add(productID int64) {
   m.m.Lock()
   defer m.m.Unlock()

   m.data[productID] = struct{}{}
}

func (m *myMap) List() []int64 {
   m.m.Lock()
   defer m.m.Unlock()

   var res []int64
   for id := range m.data {
       res = append(res, id)
   }

   // sort slice if you need
   return res
}

With the sample implementation above, you would have to be careful to store *myMap pointers (as opposed to plain myMap structs) in your cmap.ConcurrentMap structure.

LeGEC
  • 46,477
  • 5
  • 57
  • 104