I have an existing flow implemented using Operation
subclasses and an OperationQueue
. I'm trying reimplement in Combine as a learning exercise.
The current version, simplified, looks like this:
func testFunction(completionHandler: (Result) -> Void) {
let op = LongRunningOperation(object: self.stateObject)
op.completionHandler = { [unowned op] in
let result = op.result
self.stateObject.update(with: result)
completionHandler(result)
}
localSerialQueue.addOperation(op)
}
Some important aspects. LongRunningOperation
is asynchronous. It may need to call out to the main queue to complete the login process, depending on stateObject
. stateObject
is only ever accessed on the localSerialQueue
. Also, just to be really explicit, there could be other concurrent calls to runOperation
, so the serialization provided by the queue+operation is essential.
The combination of mutable state and potential concurrent invocations is absolutely critical to the problem.
To illustrate the issue, I've produced an example which proves that Combine pipelines are not executed atomically. Any other result would have surprised me, but I just wanted to verify. Multiple threads can be executing various stages of the pipeline simultaneously, violating our precondition checks.
class MyThreadUnsafeObject {
private var cancellables = Set<AnyCancellable>()
private var inProcess = false
private func testFunction() {
Result<Bool, Error> {
// begin our pipeline, checking our invariant
// and mutating the state to start our process
precondition(self.inProcess == false)
self.inProcess = true
// just pass some dummy data through
return true
}
.publisher
.flatMap { (result: Bool) -> AnyPublisher<Bool, Error> in
Future { (promise) in
precondition(self.inProcess)
// simulate a very simple long-running task here,
// which is not even asynchronous, just to make things
// really clear.
// critically, change some mutable state only
// when done
precondition(self.inProcess)
sleep(2)
precondition(self.inProcess)
promise(.success(true))
}.eraseToAnyPublisher()
}
.sink(receiveCompletion: { completion in
if case .failure = completion {
fatalError("this isn't allowed to happen")
}
}, receiveValue: { result in
precondition(result == true)
precondition(self.inProcess)
// back to the initial state
self.inProcess = false
})
.store(in: &self.cancellables)
}
func runTest() {
let group = DispatchGroup()
for _ in 0..<20 {
DispatchQueue.global().async(group: group, qos: .default, flags: .inheritQoS) {
self.testFunction()
}
}
group.wait()
}
}
Just to be extremely clear, my question is not about how to build a Combine pipeline. It is about the semantics of Combine's synchronization primitives, and how I can protect mutable state that is accesses within a pipeline. And, it could be the solution is to use Combine with another form of synchronization (like a lock) to handle this kind of access pattern. That's totally fine! I'm just curious if others have run into this kind of issue, if there is a more idiomatic way, or if I can reframe the problem somehow.