2

I am new to golang and have a use case where operations on a value of a type have to run in a sequential manner where as operation on value of other type can be run concurrently.

  1. Imagine data is coming from a streaming connection (In-order)
    key_name_1, value_1 
    key_name_2, value_2
    key_name_1, value_1
    
  2. Now, key_name_1, and key_name_2 can be operated by goroutine concurrently.
  3. But as next streamed value (3rd row) is key_name_1 again, so this operation should only be processed by goroutine if the earlier operation (1st row) has finished otherwise it should wait for the 1st operation to finish before it can apply the operation. For the sake of discussion we can assume that operation is simply adding the new value to previous value.

What would be the right way to achieve this in golang with highest possible performance ?


The exact use case is database changes are streamed on a queue, now if a value is getting changed its important that onto another database that operation is applied on the same sequence otherwise consistency will get impacted. Conflicts are rare, but can happen.

rustyx
  • 80,671
  • 25
  • 200
  • 267
Shakun
  • 57
  • 2
  • 3
    If the work is simply adding a new value to a previous value, then the synchronization costs will swamp the actual work. In this case, executing everything sequentially is the simplest and highest performance design. – Charlie Tumahai Dec 12 '21 at 20:34
  • That was just an example, the work is complex, and also the volume of events is huge so there is no reason to execute all events sequential, only of same type should be executed sequentially. Otherwise the lag of processing will become huge. – Shakun Dec 12 '21 at 20:37
  • the simplest way I can think, without having full context, is creating a channel per operation, however, if the number of keys/ops are not limited this is not a scalable solution, and the best way would be the comment above. – cperez08 Dec 12 '21 at 20:37
  • The no of keys are unlimited, hence executing them concurrently makes sense to process events fast. Only events of same type needs to be processed sequentially. – Shakun Dec 12 '21 at 20:39
  • 1
    For "*highest possible performance*" we'd need a lot more context. How often does a conflict occur, how many requests/s, how long does a request take to process, etc etc. And even then, *highest performance* is usually achieved empirically. – rustyx Dec 12 '21 at 20:50
  • `The no of keys are unlimited, hence executing them concurrently makes sense to process events fast` I agree it makes sense to execute concurrently, however, the condition about executing the same keys together complicates everything. My best choice would be a map -> reduce approach, parallelize everything, then care about joining results (you can create a Sync map to store/append results by key), or do nothing and execute lineally. – cperez08 Dec 12 '21 at 21:00
  • The exact use case is database changes are streamed on a queue, now if a value is getting changed its important that onto another database that operation is applied on the same sequence otherwise consistency will get impacted. Conflicts are rare, but can happen. @rustyx – Shakun Dec 12 '21 at 21:01
  • 1
    https://go.dev/doc/faq#go_or_golang –  Dec 12 '21 at 22:56

1 Answers1

1

As a simple solution for mutual exclusivity for a given key you can just use a locked map of ref-counted locks. It's not the most optimal for high loads, but might just suffice in your case.

type processLock struct {
    mtx      sync.Mutex
    refcount int
}

type locker struct {
    mtx   sync.Mutex
    locks map[string]*processLock
}

func (l *locker) acquire(key string) {
    l.mtx.Lock()
    lk, found := l.locks[key]
    if !found {
        lk = &processLock{}
        l.locks[key] = lk
    }
    lk.refcount++
    l.mtx.Unlock()
    lk.mtx.Lock()
}

func (l *locker) release(key string) {
    l.mtx.Lock()
    lk := l.locks[key]
    lk.refcount--
    if lk.refcount == 0 {
        delete(l.locks, key)
    }
    l.mtx.Unlock()
    lk.mtx.Unlock()
}

Just call acquire(key) before processing a key and release(key) when done with it.

Live demo.

Warning! The code above guarantees exclusivity, but not sequence. To sequentialize the unlocking you need a FIFO mutex.

rustyx
  • 80,671
  • 25
  • 200
  • 267