1

How is one supposed to aggregate data when using Grand Central Dispatch's ConcurrentPerform()?

I am doing what is in the code below, but resultDictionary seems to lose all its data when the notify() block ends. Thus all I get is an empty dictionary that is returned from the function.

I am not sure why this is happening, because when I print or set a breakpoint I can see there is something in the resultDictionary before the block ends.

    let getCVPDispatchQueue = DispatchQueue(label: "blarg",
                                        qos: .userInitiated,
                                        attributes: .concurrent)
    let getCVPDispatchGroup = DispatchGroup()

    var resultDictionary = dataIDToSRLParticleDictionary()

    getCVPDispatchQueue.async { [weak self] in
        guard let self = self else { return }
        DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { [weak self] (index) in
            guard let self = self else { return }
            let data = self.dataArray[index]
            getCVPDispatchGroup.enter()
            let theResult = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
            switch theResult {
                case .success(let CVParticle):
                    // If there was a CVP found, add it to the set.
                    if let theCVParticle = CVParticle {
                        self.dataIDsToCVPDictionary.addTodataIDToCVPDict(key: data.ID,
                                                                            value: theCVParticle)
                    }
                case .failure(let error):
                    os_log(.error, log: self.logger, "rundatasProcessing error: %s", error.localizedDescription)
                    self._isActive = false
            }
            getCVPDispatchGroup.leave()
        }

        getCVPDispatchGroup.notify(queue: .main) { [weak self] in
            guard let self = self else { return }
            print("DONE with \(self.dataIDsToCVPDictionary.getDictionary.count)")
            resultDictionary = self.dataIDsToCVPDictionary.getDictionary
            print("resultDictionary has \(self.dataIDsToCVPDictionary.getDictionary.count)")
        }

    }

    print("Before Return  with \(resultDictionary.count)")

    return resultDictionary

}

Not sure if this will help, but this is simple class I made to made accessing the dictionary thread safe.

class DATASynchronizedIDToParticleDictionary {
    var unsafeDictionary: DATAIDToDATAParticleDictionary = DATAIDToDATAParticleDictionary()
    let accessQueue = DispatchQueue(label: "blarg2",
                                    qos: .userInitiated,
                                    attributes: .concurrent)
    var getDictionary: DATAIDToDATAParticleDictionary {
        get {
            var dictionaryCopy: DATAIDToDATAParticleDictionary!
            accessQueue.sync {
                dictionaryCopy = unsafeDictionary
            }
            return dictionaryCopy
        }
    }
    func addToDATAIDToCVPDict(key: String, value: DATAParticle) {
        accessQueue.async(flags: .barrier) { [weak self] in
            guard let self = self else { return }
            self.unsafeDictionary[key] = value
        }
    }
    func clearDictionary() {
        accessQueue.async(flags: .barrier) { [weak self] in
            guard let self = self else { return }
            self.unsafeDictionary.removeAll()
        }
    }
}
Stan
  • 51
  • 2

2 Answers2

3

You said:

I am doing what is in the code below, but resultDictionary seems to lose all its data when the notify() block ends. Thus all I get is an empty dictionary that is returned from the function.

The issue is that you’re trying to return a value that is calculated asynchronously. You likely want to shift to a completion block pattern.

As an aside, the dispatch group is not necessary. Somewhat ironically, the concurrentPerform is synchronous (i.e. it doesn’t proceed until the parallelized for loop is finished). So there’s no point in using notify if you know that you won’t get to the line after the concurrentPerform until all the iterations are done.

I’d also discourage having the concurrentPerform loop update properties. It exposes you to a variety of problems. E.g. what if the main thread was interacting with that object at the same time? Sure, you can synchronize your access, but it may be incomplete. It’s probably safer to have it update local variables only, and have the caller do the property update in its completion handler block. Obviously, you can go ahead and update properties (esp if you want to update your UI to reflect the in-flight progress), but it adds an additional wrinkle to the code that might not be necessary. Below, I’ve assumed it wasn’t necessary.

Also, while I appreciate the intent behind all of these [weak self] references, they’re really not needed, especially in your synchronization class DATASynchronizedIDToParticleDictionary. We often use weak references to avoid strong reference cycles. But if you don’t have strong references, they just add overhead unless you have some other compelling need.

OK, so let’s dive into the code.

  • First, I’d retire the specialized DATASynchronizedIDToParticleDictionary with a general-purpose generic:

    class SynchronizedDictionary<Key: Hashable, Value> {
        private var _dictionary: [Key: Value]
        private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".dictionary", qos: .userInitiated, attributes: .concurrent)
    
        init(_ dictionary: [Key: Value] = [:]) {
            _dictionary = dictionary
        }
    
        var dictionary: [Key: Value] {
            queue.sync { _dictionary }
        }
    
        subscript(key: Key) -> Value? {
            get { queue.sync                   { _dictionary[key] } }
            set { queue.async(flags: .barrier) { self._dictionary[key] = newValue } }
        }
    
        func removeAll() {
            queue.async(flags: .barrier) {
                self._dictionary.removeAll()
            }
        }
    }
    

    Note, I’ve removed the unnecessary weak references. I’ve also renamed addToDATAIDToCVPDict and clearDictionary with a more natural subscript operator and a removeAll method that more closely mirrors the interface of the underlying Dictionary type. It results in more natural looking code. (And because this is a generic, we can use it for any dictionary that needs this sort of low level synchronization.)

    Anyway, you can now declare a synchronized rendition of the dictionary like so:

    let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
    

    And when I want to update the dictionary with some value, you can do:

    particles[data.ID] = theCVParticle
    

    And when I want retrieve actual underlying, wrapped dictionary, I can do:

    let finalResult = particles.dictionary
    
  • While we’re at it, since we might want to keep track of an array of errors that needs to be synchronized, I might add an array equivalent type:

    class SynchronizedArray<Value> {
        private var _array: [Value]
        private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".array", qos: .userInitiated, attributes: .concurrent)
    
        init(_ dictionary: [Value] = []) {
            _array = dictionary
        }
    
        var array: [Value] {
            queue.sync { _array }
        }
    
        subscript(index: Int) -> Value {
            get { queue.sync                   { _array[index] } }
            set { queue.async(flags: .barrier) { self._array[index] = newValue } }
        }
    
        func append(_ value: Value) {
            queue.async(flags: .barrier) {
                self._array.append(value)
            }
        }
    
        func removeAll() {
            queue.async(flags: .barrier) {
                self._array.removeAll()
            }
        }
    }
    
  • We can now turn our attention to the main routine. So rather than returning a value, we’ll instead give it an @escaping completion handler. And, as discussed above, we’d retire the unnecessary dispatch group:

    func calculateAllClosestParticles(completion: @escaping ([String: CVParticle], [Error]) -> Void) {
        let queue = DispatchQueue(label: "blarg", qos: .userInitiated, attributes: .concurrent)
        let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
        let errors = SynchronizedArray<Error>()
    
        queue.async {
            DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { index in
                let data = self.dataArray[index]
                let result = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
    
                switch result {
                case .success(let cvParticle):
                    // If there was a CVP found, add it to the set.
                    if let cvParticle = cvParticle {
                        particles[data.ID] = cvParticle
                    }
    
                case .failure(let error):
                    errors.append(error)
                }
            }
    
            DispatchQueue.main.async {
                completion(particles.dictionary, errors.array)
            }
        }
    }
    

    Now, I don’t know what the right types were for the dictionary, so you might need to adjust the parameters of the completion. And you didn’t provide the rest of the routines, so I may have some details wrong here. But don’t get lost in the details, but just note the scrupulous avoidance of properties within the concurrentPerform and the passing of the results back in the completion handler.

    You’d call it like so:

    calculateAllClosestParticles { dictionary, errors in
        guard errors.isEmpty else { return }
    
        // you can access the dictionary and updating the model and UI here
        self.someProperty = dictionary
        self.tableView.reloadData()
    }
    
    // but don't try to access the dictionary here, because the asynchronous code hasn't finished yet
    //
    
  • FWIW, while I used the reader-writer pattern you did in your example, in my experience, NSLock is actually more performant for quick synchronizations, especially when you are using concurrentPerform that might tie up all of the cores on your CPU, e.g.

    class SynchronizedDictionary<Key: Hashable, Value> {
        private var _dictionary: [Key: Value]
        private let lock = NSLock()
    
        init(_ dictionary: [Key: Value] = [:]) {
            _dictionary = dictionary
        }
    
        var dictionary: [Key: Value] {
            lock.synchronized { _dictionary }
        }
    
        subscript(key: Key) -> Value? {
            get { lock.synchronized { _dictionary[key] } }
            set { lock.synchronized { _dictionary[key] = newValue } }
        }
    
        func removeAll() {
            lock.synchronized {
                _dictionary.removeAll()
            }
        }
    }
    

    Where

    extension NSLocking {
        func synchronized<T>(_ closure: () throws -> T) rethrows -> T {
            lock()
            defer { unlock() }
            return try closure()
        }
    }
    

    Bottom line, you don’t want to force context switches for synchronization if you don’t have to.

  • When doing concurrent perform, if you have many dataPoints and if the time required by each call to runPartcleFilterForClosestParticleAndMaybeStopAudio is modest, you might want to consider “striding”, doing several datapoint in each iteration. It’s beyond the scope of this question, but just a FYI.

Rob
  • 415,655
  • 72
  • 787
  • 1,044
  • found this while trying to figure out another multithreading matter, has a bounty if you have any thoughts to share: https://stackoverflow.com/questions/75459511/writing-to-different-swift-array-indexes-from-different-threads/75460107 – johnbakers Feb 21 '23 at 17:57
0

Not exactly sure what I did, but I moved the

resultDictionary = self.dataIDsToCVPDictionary.getDictionary

outside the first async block and that seem to allowed the data to be retained/remain for the function return.

Stan
  • 51
  • 2