1

I have singleton class

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     var deviceOnArray:[String] = []
     var deviceOffArray:[String] = []
     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
   }
   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here if I check by code that device is in array it returns false but on console if print it contains the device, It only heppens when I call sendArm and sendDisarm with a second.
     let lockQueue = DispatchQueue(label: "com.test.LockQueue")
        lockQueue.sync() {
           // all code now inside this
         }
     // I also used above code but it's not working
   }


}

If you read the code then you'll know that I am facing problem in reading correct value from deviceOnArray/deviceOffArray, I am not sure how to explain this problem but I think what I need here is Obj-C atomic thread safe variable. Any idea how create one ?

Varun Naharia
  • 5,318
  • 10
  • 50
  • 84

3 Answers3

6

Details

  • Swift 5.1, Xcode 11.3.1

Solution

import Foundation
class AtomicValue<T> {

    private var _value: T
    private var accessQueue: DispatchQueue!

    init (value: T) {
        _value = value
        let address = Unmanaged.passUnretained(self).toOpaque()
        let label = "accessQueue.\(type(of: self)).\(address)"
        accessQueue = DispatchQueue(label: label)
    }
}

// MARK: Get/Set with synchronization in current queue

extension AtomicValue {
    func waitSet(lockValueAccessWhile closure: ((_ currentValue: T) -> T)?) {
        accessQueue.sync { [weak self] in
            guard let self = self, let value = closure?(self._value) else { return }
            self._value = value
        }
    }

    func waitGet(lockValueAccessWhile closure: ((_ currentValue: T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(self._value)
        }
    }

    func waitUpdate(lockValueAccessWhile closure: ((_ currentValue: inout T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(&self._value)
        }
    }

    func waitMap<V>(lockValueAccessWhile closure: ((_ currentValue: T) -> V)?) -> NotQueueSafeValue<V> {
        var value: V!
        waitGet { value = closure?($0) }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    // Be careful with func waitGet() -> NotQueueSafeValue<T>. It is ONLY for WRITE (SAVE).
    // BAD CODE: atomicValue.waitGet().notQueueSafeValue.doSometing().
    //      it is possible that atomicValue._value could be changed while func doSometing() is performing
    // GOOD CODE: atomicValue.waitGet { $0.doSometing() }.
    //      atomicValue._value will never changed while func doSometing() is performing

    struct NotQueueSafeValue<T> { let notQueueSafeValue: T }

    func waitGet() -> NotQueueSafeValue<T> {
        var value: T!
        waitGet { value = $0 }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    func waitSet(value: T) { waitSet { _ in return value } }
}

// MARK: Get/Set in current queue

extension AtomicValue {
    func set(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> T)?) {
        queue.async { [weak self] in self?.waitSet(lockValueAccessWhile: closure) }
    }

    func get(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> Void)?) {
        queue.async { [weak self] in self?.waitGet(lockValueAccessWhile: closure) }
    }

    func update(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: inout T) -> Void)?) {
        queue.async { [weak self] in self?.waitUpdate(lockValueAccessWhile: closure) }
    }
}

Usage

// Usage
let atomicValue = AtomicValue(value: 0)

atomicValue.waitSet { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.waitGet { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.waitUpdate { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

atomicValue.set(waitAccessIn: .global(qos: .default)) { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.get(waitAccessIn: .global(qos: .default)) { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.update(waitAccessIn: .global(qos: .background)) { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

print(atomicValue.waitGet().notQueueSafeValue)
atomicValue.waitSet(value: 2)

print("\(atomicValue.waitMap { "value: \($0)" }.notQueueSafeValue )")

Sample

Do not forget to paste the solution code here.

class Test {
    private let atomicValue = AtomicValue(value: 0)
    private(set) var count = 0
    private var usecDelay: UInt32 = 1000
    private let range = (0..<100)

    private func _set(currentValue: Int, dispatchGroup: DispatchGroup? = nil) -> Int {
        let newValue = currentValue + 1
        //print("\(queue.label) queue: \(newValue)")
        count += 1
        usleep(usecDelay)
        dispatchGroup?.leave()
        return newValue
    }

    private func _get(value: Int, queueLabel: String, dispatchGroup: DispatchGroup? = nil) {
        print("  get \(queueLabel) queue: \(value)")
        usleep(usecDelay)
        dispatchGroup?.leave()
    }

    func test1(queue: DispatchQueue, completion: (() -> Void)?) {
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitSet { [weak self] in
                    return self?._set(currentValue: $0) ?? $0
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label)
                }
            }
            print("test1.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func test2(queue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitUpdate { [weak self] in
                    guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
                    $0 = value
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test2.work at \(queue.label) queue completed")
            completion?()
        }
    }


    func test3(queue: DispatchQueue, waitIn waitQueue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.set(waitAccessIn: waitQueue) { [weak self] in
                    return self?._set(currentValue: $0, dispatchGroup: dispatchGroup) ?? $0
                }
                self.atomicValue.get(waitAccessIn: waitQueue)  { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
                self.atomicValue.update(waitAccessIn: waitQueue) { [weak self] in
                    guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
                    $0 = value
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test3.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func run() {
        let dspatchGroup = DispatchGroup()
        dspatchGroup.enter()
        test1(queue: .global(qos: .utility)) { dspatchGroup.leave() }                                           // result: count += range.max()
        dspatchGroup.enter()
        test1(queue: .global(qos: .unspecified))  { dspatchGroup.leave() }                                      // result: count += range.max()

        dspatchGroup.enter()
        test2(queue: .main) { dspatchGroup.leave() }                                                            // result: count += range.max()
        dspatchGroup.enter()
        test2(queue: .global(qos: .unspecified)) { dspatchGroup.leave() }                                       // result: count += range.max()

        dspatchGroup.enter()
        test3(queue: .global(qos: .default), waitIn: .global(qos: .background)) { dspatchGroup.leave() }        // result: count += 2*range.max()
        dspatchGroup.enter()
        test3(queue: .main, waitIn: .global(qos: .userInteractive)) { dspatchGroup.leave() }                    // result: count += 2*range.max()

        dspatchGroup.notify(queue: .main) { print("End. Count: \(self.count)") }
    }
}

Test().run()

Result (log)

  get com.apple.root.default-qos queue: 3
  get com.apple.root.default-qos queue: 4
  get com.apple.root.default-qos queue: 4
  get com.apple.root.utility-qos queue: 5
  get com.apple.root.default-qos queue: 6
  get com.apple.root.default-qos queue: 7
  get com.apple.root.default-qos queue: 8
  get com.apple.root.default-qos queue: 9
  get com.apple.root.default-qos queue: 10
  get com.apple.root.default-qos queue: 11
  get com.apple.root.default-qos queue: 12
  get com.apple.root.default-qos queue: 13
  get com.apple.root.default-qos queue: 14
test3.work at com.apple.root.default-qos queue completed
  get com.apple.main-thread queue: 15
  get com.apple.root.default-qos queue: 17
  get com.apple.root.default-qos queue: 18
  get com.apple.root.utility-qos queue: 18
  get com.apple.main-thread queue: 19
  get com.apple.root.default-qos queue: 21
  get com.apple.root.default-qos queue: 22
  get com.apple.root.utility-qos queue: 22
  get com.apple.main-thread queue: 23
  get com.apple.root.default-qos queue: 25
  get com.apple.root.default-qos queue: 26
  get com.apple.root.utility-qos queue: 26
  get com.apple.main-thread queue: 27
  get com.apple.root.default-qos queue: 29
  get com.apple.root.default-qos queue: 30
  get com.apple.root.utility-qos queue: 30
  get com.apple.main-thread queue: 31
  get com.apple.root.default-qos queue: 33
  get com.apple.root.default-qos queue: 34
  get com.apple.root.utility-qos queue: 34
  get com.apple.main-thread queue: 35
  get com.apple.root.default-qos queue: 37
  get com.apple.root.default-qos queue: 38
  get com.apple.root.utility-qos queue: 38
  get com.apple.main-thread queue: 39
  get com.apple.root.default-qos queue: 41
  get com.apple.root.default-qos queue: 42
  get com.apple.root.utility-qos queue: 42
  get com.apple.main-thread queue: 43
  get com.apple.root.default-qos queue: 45
  get com.apple.root.default-qos queue: 46
  get com.apple.root.utility-qos queue: 46
  get com.apple.main-thread queue: 47
  get com.apple.root.default-qos queue: 49
test1.work at com.apple.root.default-qos queue completed
  get com.apple.root.default-qos queue: 50
test2.work at com.apple.root.default-qos queue completed
  get com.apple.root.utility-qos queue: 50
  get com.apple.main-thread queue: 50
test1.work at com.apple.root.utility-qos queue completed
  get com.apple.main-thread queue: 51
test2.work at com.apple.main-thread queue completed
  get com.apple.main-thread queue: 52
  get com.apple.main-thread queue: 53
  get com.apple.main-thread queue: 54
  get com.apple.main-thread queue: 55
  get com.apple.main-thread queue: 56
  get com.apple.main-thread queue: 57
  get com.apple.main-thread queue: 58
  get com.apple.main-thread queue: 59
  get com.apple.main-thread queue: 60
test3.work at com.apple.main-thread queue completed
End. Count: 60
Vasily Bodnarchuk
  • 24,482
  • 9
  • 132
  • 127
  • This is perfect! I'm using it as a struct (add `mutating` to `func set()` -- works like a charm. Thank you. – coping Jun 26 '19 at 03:22
  • Will there be an issue when `atomicValue.value += 1` (a read and then a write operation) is happening and another thread attempts to set `atomicValue.value`? If so, I think we should remove the `value`'s `set` and use `set(closure:)` instead. – CyberMew Sep 18 '19 at 10:12
  • @CyberMew yes, you right. I will update this class little bit later. I have several new ideas. – Vasily Bodnarchuk Sep 23 '19 at 19:19
  • If this atomic value be called on main thread it would cause a deadlock. so you have to use it on other queues like global. i suggest use a private serial queue in AtomicValue class and handle setter and getter through the queue serially. @VasilyBodnarchuk – Mehrdad Jan 24 '20 at 12:48
  • @Mehrdad yes, this is possible to catch dead lock. More info here: https://stackoverflow.com/a/54104259 – Vasily Bodnarchuk Jan 24 '20 at 12:56
3

You can use a serial dispatch queue to ensure that the array is only updated in a thread safe manner.

It is also best to change your deviceOnArray property to private to ensure that it cannot be accessed by some other object. If you need to expose this array to other objects, do so via a computed property. e.g.

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     private var deviceOnArray:[String] = []
     var deviceOn: [String] {
         return self.deviceOnArray
     }

     var deviceOffArray:[String] = []
     private let dispatchQueue = DispatchQueue(label:"DeviceControllerQueue")

     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
       self.dispatchQueue.sync {
           deviceOnArray.append(topic)
       }
   }

   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here.
       self.dispatchQueue.sync {
           if let index = self.deviceOnArray.index(of: topic) {
               self.deviceOnArray.remove(at: index)
           }
       }
   }
}
Paulw11
  • 108,386
  • 14
  • 159
  • 186
  • It is showing error `Value of type '(NSObject) -> () -> DeviceController' has no member 'deviceOnArray'` on line `return self.deviceOnArray` – Varun Naharia Mar 15 '17 at 12:21
  • There was an error in my original code; but I have edited now. – Paulw11 Mar 15 '17 at 12:26
  • Will it prevent if sendDisarm() is called two different thread? – Ankush Mar 09 '18 at 10:42
  • This code won't prevent `sendDisarm` being called, but it ensures that the code in `sendDisarm` is not executed concurrently if it is called by two different threads since it is dispatched synchronously onto a serial dispatch queue. – Paulw11 Mar 09 '18 at 20:08
2

Atomic properties won't help you here. They are intended to sync on assignment of the property as a whole and not of the internals (e.g. they do not sync insertion/removal of elements to the array). They almost only ensure correct retain/release/autorelease calls in order to keep your program from crashing / leaking.

What you would need is Dispatch​Semaphore or something similar (or maybe more native stuff, posix pthread_mutex stuff) to ensure mutual exclusive access.

Andreas Oetjen
  • 9,889
  • 1
  • 24
  • 34