4

I am working on a project where during startup I need to read certain files and store it in memory in a map and then periodically look for new files if there are any and then replace whatever I had in memory in the map earlier during startup with this new data. Basically every time if there is a new file which is a full state then I want to refresh my in memory map objects to this new one instead of appending to it.

Below method loadAtStartupAndProcessNewChanges is called during server startup which reads the file and store data in memory. Also it starts a go-routine detectNewFiles which periodically checks if there are any new files and store it on a deltaChan channel which is later accessed by another go-routine processNewFiles to read that new file again and store data in the same map. If there is any error then we store it on err channel. loadFiles is the function which will read files in memory and store it in map.

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

This method basically figures out if there are any new files that needs to be consumed and if there is any then it will put it on the deltaChan channel which will be later on consumed by processNewFiles go-routine and read the file in memory. If there is any error then it will add error to the error channel.

func (r *customerConfig) detectNewFiles(rootPath string) {

}

This will read all s3 files and store it in memory and return error. In this method I clear previous state of my map so that it can have fresh state from new files. This method is called during server startup and also called whenever we need to process new files from processNewFiles go-routine.

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

This method read the files and add in the data concurrent map.

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

This method will pick what is there on the delta channel and if there are any new files then it will start reading that new file by calling loadFiles method. If there is any error then it will add error to the error channel.

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call "loadFiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

If there is any error on the error channel then it will log those errors from below method -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

Problem Statement

Above logic works for me without any issues but there is one small bug in my code which I am not able to figure out on how to solve it. As you can see I have a concurrent map which I am populating in my read method and also clearing that whole map in loadFiles method. Because whenever there is a new file on delta channel I don't want to keep previous state in the map so that's why I am removing everything from the map and then adding new state from new files to it.

Now if there is any error in read method then the bug happens bcoz I have already cleared all the data in my data map which will have empty map which is not what I want. Basically if there is any error then I would like to preserve previous state in the data map. How can I resolve this issue in my above current design.

Note: I am using golang concurrent map

Chandan
  • 11,465
  • 1
  • 6
  • 25
dragons
  • 549
  • 1
  • 8
  • 24
  • I don't see your detail implementation on `remove` and `update value` on your code. But as you said, why you don't keep the previous state and remove it when you successful read file and set new state? – nguyenhoai890 Apr 19 '22 at 03:19
  • @nguyenhoai890 Inside read method, I just set values into the map after reading the files. And then in `loadFiles` method I am clearing the whole map. It's all there in the code. I removed the other pieces of code which isn't needed. That is the problem I am having where I want to keep previous state and remove only when successful read is done with my above design. How do I change my above design so that it can work with previous state and only delete when successful read is done? – dragons Apr 19 '22 at 03:21
  • There is no need to use 3rd for concurrent map, imo. you can check `https://pkg.go.dev/sync#Map` if you really need concurrent map. There is only one way that you should refactor your code following the below idea: separate loadFile logic and modify map into 2 funcs. E.g `fileData, err := loadFile(...); r.parseMap(fileData)`. In parse map, you can remove the previous state AFTER you successfully read the new state. – nguyenhoai890 Apr 19 '22 at 07:46
  • @nguyenhoai890 How will I change my above design to accomodate your suggestion. I am kinda confuse here. Can you provide an example with my above current design? – dragons Apr 22 '22 at 03:39
  • can you post your code details of assigning and reading? just remove/change name of that config.props if you need. So I can give you a solution – nguyenhoai890 Apr 22 '22 at 08:31
  • @nguyenhoai890 I have updated my `read` method where I populate the concurrent map. Let me know if you need any other details. – dragons Apr 22 '22 at 17:39
  • @dragons let `read` method return data and error move all map related work to `loadFiles` this way you can check if there was any error return by `read` then do nothing else clear and update the map with new data in `loadFiles`. – Chandan Apr 22 '22 at 19:22
  • @dragons you can achieve the same with just 3 functions - `detect`, `read`, `load`, detect will get new files as if currently does and push to delta channel, load will get file to read from delta channel and call read method to get the data and error then checks for error if no error then clear the map and update with new content, so you would have 2 go routines and 1 function which would be called by load routine – Chandan Apr 22 '22 at 19:33
  • @Chandan Can you provide an example on how that will work with my above current design? I am currently confuse on that. Any help will be greatly appreciated. – dragons Apr 22 '22 at 19:56

4 Answers4

3

I think your design is over complicated. It can be solved much simpler, which gives all the benefits you desire:

  • safe for concurrent access
  • detected changes are reloaded
  • accessing the config gives you the most recent, successfully loaded config
  • the most recent config is always, immediately accessible, even if loading a new config due to detected changes takes long
  • if loading new config fails, the previous "snapshot" is kept and remains the current
  • as a bonus, it's much simpler and doesn't even use 3rd party libs

Let's see how to achieve this:


Have a CustomerConfig struct holding everything you want to cache (this is the "snapshot"):

type CustomerConfig struct {
    Data map[string]bool

    // Add other props if you need:
    LoadedAt time.Time
}

Provide a function that loads the config you wish to cache. Note: this function is stateless, it does not access / operate on package level variables:

func loadConfig() (*CustomerConfig, error) {
    cfg := &CustomerConfig{
        Data:     map[string]bool{},
        LoadedAt: time.Now(),
    }

    // Logic to load files, and populate cfg.Data
    // If an error occurs, return it

    // If loading succeeds, return the config
    return cfg, nil
}

Now let's create our "cache manager". The cache manager stores the actual / current config (the snapshot), and provides access to it. For safe concurrent access (and update), we use a sync.RWMutex. Also has means to stop the manager (to stop the concurrent refreshing):

type ConfigCache struct {
    configMu sync.RWMutex
    config   *CustomerConfig
    closeCh  chan struct{}
}

Creating a cache loads the initial config. Also launches a goroutine that will be responsible to periodically check for changes.

func NewConfigCache() (*ConfigCache, error) {
    cfg, err := loadConfig()
    if err != nil {
        return nil, fmt.Errorf("loading initial config failed: %w", err)
    }

    cc := &ConfigCache{
        config:  cfg,
        closeCh: make(chan struct{}),
    }

    // launch goroutine to periodically check for changes, and load new configs
    go cc.refresher()

    return cc, nil
}

The refresher() periodically checks for changes, and if changes are detected, calls loadConfig() to load new data to be cached, and stores it as the current / actual config (while locking configMu). It also monitors closeCh to stop if that is requested:

func (cc *ConfigCache) refresher() {
    ticker := time.NewTicker(1 * time.Minute) // Every minute
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Check if there are changes
            changes := false // logic to detect changes
            if !changes {
                continue // No changes, continue
            }

            // Changes! load new config:
            cfg, err := loadConfig()
            if err != nil {
                log.Printf("Failed to load config: %v", err)
                continue // Keep the previous config
            }

            // Apply / store new config
            cc.configMu.Lock()
            cc.config = cfg
            cc.configMu.Unlock()

        case <-cc.closeCh:
            return
        }
    }
}

Closing the cache manager (the refresher goroutine) is as easy as:

func (cc *ConfigCache) Stop() {
    close(cc.closeCh)
}

The last missing piece is how you access the current config. That's a simple GetConfig() method (that also uses configMu, but in read-only mode):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
    cc.configMu.RLock()
    defer cc.configMu.RUnlock()
    return cc.config
}

This is how you can use this:

cc, err := NewConfigCache()
if err != nil {
    // Decide what to do: retry, terminate etc.
}

// Where ever, whenever you need the actual (most recent) config in your app:

cfg := cc.GetConfig()
// Use cfg

Before you shut down your app (or you want to stop the refreshing), you may call cc.Stop().

icza
  • 389,944
  • 63
  • 907
  • 827
  • Although the question is "How can I resolve this issue in my above current design.", I think this is better solution overall. Personally, I would suggest not to use init() and package level vars and use `NewXXX()` convention. So that the app can have better control over the order of initializations. – geliba187 Apr 27 '22 at 11:53
  • @geliba187 Good point. I've rewritten the answer. – icza Apr 27 '22 at 12:25
1

Added RWMutex for collectedData concurrent write protecting by worker goroutine

type customerConfig struct {
   ...
   m sync.RWMutex
}

Instead of updating map in read method let read method just return the data and error

func (r *customerConfig) read(file string, bucket string) ([]CompModel, error) {
  // read file data and return with error if any
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return (nil, errs.Wrap(err))
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return (nil, errs.Wrap(err))
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return (nil, errors.New("No Data"))
  }

  var invMods = []CompModel{}
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    var jsonData []CompModel
    err = json.Unmarshal(byteSlice, &jsonData)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    invMods = append(invMods, jsonData...)
  }
  return invMods, nil
}

And then loadFiles you can collect the data return by read method and if no error only then clear and update the map else leave the old data as it was before

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  // r.data.Clear() <- remove the clear from here
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  collectedData := []CompModel{}

  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()

      data, err:= r.read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.m.Lock()
      append(collectedData, data...)
      r.m.Unlock()
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  r.data.Clear()
  for i := range collectedData {
    key := strconv.FormatInt(collectedData[i].ProductID, 10) + ":" + strconv.Itoa(int(collectedData[i].Iaz))
    hasInventory := false
    if collectedData[i].Available > 0 {
      hasInventory = true
    }
    r.data.Set(key, hasInventory)
  }

  return nil
}

Note: Since the code is not runnable just updated methods for reference and I have not include mutex lock for updating the slice you may need to handle for the case.


The same can be achieved with just 3 functions - detect, read, load, detect will check for new files by interval and push to delta channel if found any, load will get file path to read from delta channel and call read method to get the data and error then checks if no error then clear the map and update with new content else log the error, so you would have 2 go routines and 1 function which would be called by load routine

package main

import (
  "fmt"

  "time"
  "os"
  "os/signal"
  "math/rand"
)

func main() {
  fmt.Println(">>>", center("STARTED", 30), "<<<")

  c := &Config{
    InitialPath: "Old Path",
    DetectInterval: 3000,
  }
  c.start()
  fmt.Println(">>>", center("ENDED", 30), "<<<")
}

// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
    return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w + len(s))/2, s))
}

type Config struct {
  deltaCh chan string
  ticker *time.Ticker
  stopSignal chan os.Signal
  InitialPath string
  DetectInterval time.Duration
}

func (c *Config) start() {
  c.stopSignal = make(chan os.Signal, 1)
  signal.Notify(c.stopSignal, os.Interrupt)

  c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
  c.deltaCh = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.InitialPath != "" {
    c.deltaCh <- c.InitialPath
  }
  <- c.stopSignal
  c.ticker.Stop()
}

// Detect New Files
func (c *Config) detect() {
  for {
    select {
      case <- c.stopSignal:
        return
      case <- c.ticker.C:
        fmt.Println(">>>", center("DETECT", 30), "<<<")
        c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
    }
  }
}
// Read Files
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.Println(">>>", center("READ", 30), "<<<")
  fmt.Println(path)
  return data, nil
}
// Load Files
func (c *Config) load() {
  for {
    select {
      case <- c.stopSignal:
        return
      case path := <- c.deltaCh:
        fmt.Println(">>>", center("LOAD", 30), "<<<")
        data, err := read(path)
        if err != nil {
          fmt.Println("Log Error")
        } else {
          fmt.Println("Success", data)
        }
        fmt.Println()
    }
  }
}

Note: Not included map in sample code it can be easily updated to include map

Chandan
  • 11,465
  • 1
  • 6
  • 25
  • Thanks a lot for your suggestion. I think one problem in your code is you are clearing map for each file right but that is not what I want? I have a group of files for which I need to add data into the map so during server startup I read all those files and add data into the map. Now when the timer runs and it detect there are new set of files then it should read all those data from those new files and if successful then clear data from the map and add it to the same map. That is why earlier I was clearing my map above for loop of file iteration. Let me know if it makes sense? – dragons Apr 23 '22 at 11:44
  • @dragons so suppose on start it finds files A and B whose data will be loaded into the map but later it found C and D so it will remove data of A and B and load data of C and D is it correct ? – Chandan Apr 23 '22 at 13:58
  • @dragons please check the updated answer and let me know still there any problem. – Chandan Apr 23 '22 at 15:38
  • Yes that is correct on files thing you mentioned above. Also forgot to mention earlier, for loop in `read` method is infinite loop `for {...}` so if we return `invMods` immediately then we gonna miss next iteration of it. Right? so there is bug I guess then. That is why I was populating my map there earlier. I forgot to mention earlier sorry about that. Basically I am breaking out of that infinite for loop from that if statement `if len(rows) <= 0 {` with a break so I guess we need to make a list of list then? Any thoughts? How can we deal with that? – dragons Apr 23 '22 at 17:24
  • @dragons has you mentioned we can create a single dimension list which will contain all items and just keep adding to that list in for loop until our condition breaks the loop and then return the list at the end, I have update the answer with the same logic. – Chandan Apr 23 '22 at 18:50
  • Looking into updated suggestion as of now but why do I need mutex lock in my read method? Just kinda confuse. Also since we have single dimension list do we need any change the way we need to iterate in `loadFiles` method? – dragons Apr 23 '22 at 19:41
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/244137/discussion-between-dragons-and-chandan). – dragons Apr 23 '22 at 19:53
0

Just allocate new one map. Like this:

var mu sync.Mutex
before := map[string]string{} // Some map before reading

after := make(map[string]string)

// Read files and fill `after` map

mu.Lock()
before = after
mu.Unlock()
thrownew
  • 119
  • 4
0

Instead of clearing the map in loadFile method, do something like this in read

func (r *customerConfig) read(file string, bucket string) error {
  m := cmap.New() // create a new map
  // ...
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      m.Set(key, hasInventory)
    }
  }
  r.data = m // Use the new map
  return nil
}
geliba187
  • 357
  • 2
  • 11