7

I've been manipulating byte arrays in Swift 2.1 lately, and I often find myself writing code like this:

// code to add functions to a [UInt8] object
extension CollectionType where Generator.Element == UInt8 {

    func xor(with byte: UInt8) -> [UInt8] {
        return map { $0 ^ byte }
    }
}

// example usage: [67, 108].xor(with: 0) == [67, 108]

Is there an easy way to parallelize this map call, so that multiple threads can operate on non-overlapping areas of the array at the same time?

I could write code to manually divide the array into sub-arrays and call map on each sub-array in distinct threads. But I wonder if some framework exists in Swift to do the division automatically, since map is a functional call that can work in a thread-safe environment without side-effects.

Clarifying notes:

  1. The code only needs to work on a [UInt8] object, not necessarily every CollectionType.
m81
  • 2,217
  • 5
  • 31
  • 47

3 Answers3

8

The easiest way to perform a loop of calculations in parallel is concurrentPerform (previously called dispatch_apply; see Performing Loop Iterations Concurrently in the Concurrency Programming Guide). But, no, there is no map rendition that will do this for you. You have to do this yourself.

For example, you could write an extension to perform the concurrent tasks:

extension Array {
    public func concurrentMap<T>(_ transform: (Element) -> T) -> [T] {
        var results = [Int: T](minimumCapacity: count)

        let lock = NSLock()

        DispatchQueue.concurrentPerform(iterations: count) { index in
            let result = transform(self[index])
            lock.synchronized {
                results[index] = result
            }
        }

        return (0 ..< results.count).compactMap { results[$0] }
    }
}

Where

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

You can use whatever synchronization mechanism you want (locks, serial queues, reader-writer), but the idea is to perform transform concurrently and then synchronize the update of the collection.

Note:

  • This will block the thread you call it from (just like the non-concurrent map will), so make sure to dispatch this to a background queue.

  • One needs to ensure that there is enough work on each thread to justify the inherent overhead of managing all of these threads. (E.g. a simple xor call per loop is not sufficient, and you'll find that it's actually slower than the non-concurrent rendition.) In these cases, make sure you stride (see Improving Loop Code that balances the amount of work per concurrent block). For example, rather than doing 5000 iterations of one extremely simple operation, do 10 iterations of 500 operations per loop. You may have to experiment with suitable striding values.


While I suspect you don't need this discussion, for readers unfamiliar with concurrentPerform (formerly known as dispatch_apply), I'll illustrate its use below. For a more complete discussion on the topic, refer to the links above.

For example, let's consider something far more complicated than a simple xor (because with something that simple, the overhead outweighs any performance gained), such as a naive Fibonacci implementation:

func fibonacci(_ n: Int) -> Int {
    if n == 0 || n == 1 {
        return n
    }
    return fibonacci(n - 1) + fibonacci(n - 2)
}

If you had an array of Int values for which you wanted to calculate, rather than:

let results = array.map { fibonacci($0) }

You could:

var results = [Int](count: array.count, repeatedValue: 0)
DispatchQueue.concurrentPerform(iterations: array.count) { index in
    let result = self.fibonacci(array[index])
    synchronize.update { results[index] = result }      // use whatever synchronization mechanism you want
}

Or, if you want a functional rendition, you can use that extension I defined above:

let results = array.concurrentMap { fibonacci($0) }

For Swift 2 rendition, see previous revision of this answer.

Rob
  • 415,655
  • 72
  • 787
  • 1,044
  • Like I said in the now deleted answer, I've been doing some benchmarking. It is slightly faster if you split the array in x slices where x = `NSProcessInfo().activeProcessorCount * 4`. Each slice gets it's own `dispatch_apply` – R Menke Dec 09 '15 at 21:54
  • This implementation traps unwrapping a nil optional when I plug it into [my test framework](https://gist.github.com/dabrahams/ea5495b4cccc2970cd56e8cfc72ca761), and I think even if that were fixed it would be very inefficient, because it queues an async operation for each element. – Dave Abrahams Mar 13 '20 at 22:38
  • Yep, you want to stride if your calculations are not material enough for each iteration of `concurrentPerform`. See https://stackoverflow.com/a/39949292/1271826 for example of striding. – Rob Mar 13 '20 at 22:44
3

My implementation seems to be correct and performs well by comparison with all the others I've seen. Tests and benchmarks are here

extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(_ transform: (Element) -> B) -> [B] {
        let batchSize = 4096 // Tune this
        let n = self.count
        let batchCount = (n + batchSize - 1) / batchSize
        if batchCount < 2 { return self.map(transform) }

        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!

            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    p.initialize(to: transform(self[sourceIndex]))
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }
}

Hope this helps,

-Dave

Dave Abrahams
  • 7,416
  • 5
  • 31
  • 19
-1

You can use parMap(), which is parrallel map. You can use activity monitor to check if it's parrallel map.

func map<T: Collection, U>( _ transform: (T.Iterator.Element) -> U, _ xs: T) -> [U] {
    return xs.reduce([U](), {$0 + [transform($1)]})
}

public func parMap<T,U>(_ transform: @escaping (T)->U, _ xs: [T]) -> [U] {
    let len     = xs.count
    var results = [U?](repeating: nil, count: len)
    let process = { (i: Int) -> Void in results[i] = transform(xs[i]) }
    DispatchQueue.concurrentPerform(iterations: len, execute: process)
    return map({$0!}, results)
}

func test() {
    parMap({_ in Array(1...10000000).reduce(0,+)}, Array(1...10))
}

enter image description here

webcpu
  • 3,174
  • 2
  • 24
  • 17