0

I want to speed up some process so I wrote a swift CLI script that process thousands of files in parallel and write the process result of each file into a single file. (The order of the files does not really matter)

So I wrote below code and it works in the Xcode unit tests (Even with a list of approx 1200 files!) However when I execute the program from the command line without Xcode and with the same list of files it never ends. It looks like it is stuck near the end.

I read that sometimes spanning too many threads will cause the program to stop because it runs out of resources but I thought DispatchQueue.concurrentPerform will take care of that... I have no clue why this works in XCTests and does not work in the terminal.

I have tried DispatchGroup and Semaphore approach and both have the same problem...

Any help is highly appreciated.

let filePaths: [String] = Array with thousands of file paths to process

let group = DispatchGroup()
let concurrentQueue = DispatchQueue(label: "my.concurrent.queue", qos: .userInitiated, attributes: .concurrent)
let serialQueue = DispatchQueue(label: "my.serial.queue", qos: .userInitiated)

group.enter()
concurrentQueue.async {
    DispatchQueue.concurrentPerform(iterations: filePaths.count) { (fileIndex) in
        let filePath = filePaths[fileIndex]
        let result = self.processFile(path: filePath)
        group.enter()
        serialQueue.async {
            self.writeResult(result)
            group.leave()
        }
    }
    group.leave()
}
group.wait()
nacho4d
  • 43,720
  • 45
  • 157
  • 240
  • 2
    Related: https://stackoverflow.com/questions/15150308/workaround-on-the-threads-limit-in-grand-central-dispatch – Cristik Oct 27 '21 at 17:40
  • 1
    Try batching your work and only work on 6 or 8 concurrent processes at a time. In the case of working with files, those threads are likely to sit around waiting on disk I/O for a long time - be gentle with the poor I/O system it's "slow". – Scott Thompson Oct 27 '21 at 18:16

1 Answers1

0

First, a few simplifications:

  1. You have code that is

    group.enter()
    serialQueue.async {
        self.writeResult(result)
        group.leave()
    }
    

    That can be simplified to:

    serialQueue.async(group: group) {
        self.writeResult(result)
    }
    
  2. Consider:

    group.enter()
    concurrentQueue.async {
        DispatchQueue.concurrentPerform(iterations: filePaths.count) { (fileIndex) in
            ...
        }
        group.leave()
    }
    

    That concurrentQueue is redundant. This can be simplified to:

    DispatchQueue.concurrentPerform(iterations: filePaths.count) { (fileIndex) in
        ...
    }
    

That reduces your code to:

let group = DispatchGroup()
let writeQueue = DispatchQueue(label: "serial.write.queue", qos: .userInitiated)

DispatchQueue.concurrentPerform(iterations: filePaths.count) { [self] index in
    let result = processFile(path: filePaths[index])
    writeQueue.async(group: group) {
        writeResult(result)
    }
}

group.wait()

That begs the question as to why you are dispatching asynchronously to a serial queue for the write operations. That can introduce problems (e.g. if it gets backlogged, you will holding all unwritten result values in memory at the same time).

One option is to write synchronously (you have to wait for the write operations in the end, anyway):

let writeQueue = DispatchQueue(label: "serial.write.queue", qos: .userInitiated)

DispatchQueue.concurrentPerform(iterations: filePaths.count) { [self] index in
    let result = processFile(path: filePaths[index])
    writeQueue.sync {
        writeResult(result)
    }
}

Or you can probably just write from the various concurrent threads, themselves:

let writeQueue = DispatchQueue(label: "serial.write.queue", qos: .userInitiated)

DispatchQueue.concurrentPerform(iterations: filePaths.count) { [self] index in
    let result = processFile(path: filePaths[index])
    writeResult(result)
}
Rob
  • 415,655
  • 72
  • 787
  • 1,044