2

I have an existing flow implemented using Operation subclasses and an OperationQueue. I'm trying reimplement in Combine as a learning exercise.

The current version, simplified, looks like this:

func testFunction(completionHandler: (Result) -> Void) {
    let op = LongRunningOperation(object: self.stateObject)

    op.completionHandler = { [unowned op] in
        let result = op.result
        self.stateObject.update(with: result)

        completionHandler(result)
    }

    localSerialQueue.addOperation(op)
}

Some important aspects. LongRunningOperation is asynchronous. It may need to call out to the main queue to complete the login process, depending on stateObject. stateObject is only ever accessed on the localSerialQueue. Also, just to be really explicit, there could be other concurrent calls to runOperation, so the serialization provided by the queue+operation is essential.

The combination of mutable state and potential concurrent invocations is absolutely critical to the problem.

To illustrate the issue, I've produced an example which proves that Combine pipelines are not executed atomically. Any other result would have surprised me, but I just wanted to verify. Multiple threads can be executing various stages of the pipeline simultaneously, violating our precondition checks.

class MyThreadUnsafeObject {
    private var cancellables = Set<AnyCancellable>()
    private var inProcess = false

    private func testFunction() {
        Result<Bool, Error> {
            // begin our pipeline, checking our invariant
            // and mutating the state to start our process
            precondition(self.inProcess == false)

            self.inProcess = true

            // just pass some dummy data through
            return true
        }
        .publisher
        .flatMap { (result: Bool) -> AnyPublisher<Bool, Error> in
            Future { (promise) in
                precondition(self.inProcess)

                // simulate a very simple long-running task here,
                // which is not even asynchronous, just to make things
                // really clear.

                // critically, change some mutable state only
                // when done

                precondition(self.inProcess)
                sleep(2)
                precondition(self.inProcess)

                promise(.success(true))
            }.eraseToAnyPublisher()
        }
        .sink(receiveCompletion: { completion in
            if case .failure = completion {
                fatalError("this isn't allowed to happen")
            }
        }, receiveValue: { result in
            precondition(result == true)
            precondition(self.inProcess)

            // back to the initial state
            self.inProcess = false
        })
        .store(in: &self.cancellables)
    }

    func runTest() {
        let group = DispatchGroup()

        for _ in 0..<20 {
            DispatchQueue.global().async(group: group, qos: .default, flags: .inheritQoS) {
                self.testFunction()
            }
        }

        group.wait()
    }
}

Just to be extremely clear, my question is not about how to build a Combine pipeline. It is about the semantics of Combine's synchronization primitives, and how I can protect mutable state that is accesses within a pipeline. And, it could be the solution is to use Combine with another form of synchronization (like a lock) to handle this kind of access pattern. That's totally fine! I'm just curious if others have run into this kind of issue, if there is a more idiomatic way, or if I can reframe the problem somehow.

Mattie
  • 2,868
  • 2
  • 25
  • 40
  • 1
    The technique is pretty well illustrated by my answer here: https://stackoverflow.com/a/60418000/341994 – matt Oct 28 '20 at 23:40
  • Also for learning and understanding Combine you might want to read my online tutorial, called, uh, Understanding Combine https://www.apeth.com/UnderstandingCombine/toc.html – matt Oct 29 '20 at 00:01
  • Thank you for these two links. Both were really helpful. I think I can use your example to clean up this code quite a bit. However, I don't *think* I can use the approach in that answer to also achieve the atomic behavior I'm interested in. Am I misunderstanding? – Mattie Oct 29 '20 at 00:13
  • I don’t know what atomic means here. – matt Oct 29 '20 at 00:20
  • I'd like the same semantics I get with Operation. Once a call to authenticationCodePublisher begins, should another call happen, it must wait in the queue until the first finishes. I want the entire process to be serialized. I'm using the queue as a lock, to protect the shared state. – Mattie Oct 29 '20 at 00:25
  • You have misunderstood Combine, perhaps? I think you need to slap yourself on the side of the head and knock the whole OperationQueue idea away. There is no "queue". The pipeline is itself sequential. That is the whole point. Data flows down the pipeline, even if steps are asynchronous. The link I pointed you to earlier shows a pipeline that does exactly what you just said: one thing after another. Only after we check for status do we proceed (if necessary) to request authorization. Only if we get authorization (asynchronously!) do we proceed to fetch info from a contact. – matt Oct 29 '20 at 01:41
  • You might be helped also by my answers at https://stackoverflow.com/a/59889993/341994, https://stackoverflow.com/a/62469820/341994, https://stackoverflow.com/a/61855132/341994, https://stackoverflow.com/a/59430629/341994 And please do read my tutorial, long though it may be. Look at the examples. Try them out. Think about them. – matt Oct 29 '20 at 01:46
  • Also I've now posted the code from the first link as a downloadable project: https://github.com/mattneub/CombineAuthorization (because you don't seem to be getting the idea by reading my answer at link). Try it. Download the project. Run the project on your simulator, and tap the button. Look at how, if authorization has never been granted or denied, the whole thing "pauses" while the authorization dialog comes up, and doesn't "resume" until after you either grant or deny authorization (and behaves correctly either way). It is "atomic" in the sense you are using that word. – matt Oct 29 '20 at 02:34

2 Answers2

3

you can still use synchronization primitives that you know well, like dispatch queues and run loops, also with Combine code.

Combine (RxSwift and other async frameworks) use a scheduler to control the execution of each operator's code, i.e. running all the chained closures isn't free-for-all but there is a "controller" that schedules and controls the execution. You can set which scheduler you want to use by inserting an operator to set the scheduler - in that sense the model is exactly the same as using GDC, operations etc. - you'd like to do the heavy work on a background queue and perform the side-effects on the main queue to update your UI.

Luckily RunLoop and DispatchQueue are schedulers themselves — you can define concurrent or serial queues you'd like to use for the code execution and switch the pipeline to whichever you want to use. (see here: Conforms To)

Here's an example that starts any subscription on a concurrent background queue and updates the UI on the main thread, I think this should be enough to find your way in your code:

import Combine

let myBgQueue = DispatchQueue(
    label: "concurrent",
    qos: .default,
    attributes: .concurrent
)

let sub = (0...5).publisher
    .subscribe(on: myBgQueue)
    .flatMap({ el -> AnyPublisher<Int, Never> in
        Future { (promise) in
            precondition(!Thread.isMainThread)
            sleep(2)

            print("Future #\(el)")
            promise(.success(el))
        }.eraseToAnyPublisher()
    })
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { _ in
        print("Done.")
    }, receiveValue: { el in
        precondition(Thread.isMainThread)
        print("Got #\(el)")
    })

The console output is as follows (no precondition crashes which verifies the code runs on the correct queues):

Future #0
Got #0
Future #1
Got #1
Future #2
Got #2
Future #3
Got #3
Future #4
Got #4
Future #5
Got #5
Done.

Some time ago I write a blog post about receive(on:) and subscribe(on:) if you want to read more: https://trycombine.com/posts/subscribe-on-receive-on/

Marin Todorov
  • 6,377
  • 9
  • 45
  • 73
2

The whole point of Combine is that it lines up steps one after another. One step cannot execute until the previous step has signalled that it has executed (by passing a value down the pipeline). Thus merely using Combine makes things "atomic" in your sense; that is the entire point.

This has nothing to do with queues. You can specify a dispatch queue, or switch dispatch queues in the course of the pipeline, but that doesn't even really matter; switching queues is just another step on the way down the pipeline.

The code you have shown is pretty much nonsense and most of it is unnecessary. You don't call one publisher in the middle of another. You don't call store in the middle of a pipeline. You don't usually need to call subscribe. You don't call receive(on:) unless you are switching threads (dispatch queues actually).

So, you construct one pipeline. It starts with a publisher and after that there is a series of operators. Then, at the very end, there is one subscriber (sink or assign) and one store(in:) to keep it all alive, and that's it.

There are operators for describing every situation and topology you can think of. In particular (since this is the part you seem most puzzled about), there are various ways of handling asynchronicity in the middle of the pipeline, depending on what your goal is. Here are a few of them:

  • If, in the middle of the pipeline, you need another publisher to start only after the whole upstream pipeline has produced a value, you use the flatMap operator to bring that publisher into existence in response to receipt of that value.

  • Or, if the idea is that one publisher should start only after another has finished, but you do not need to pass anything from the first to the second, use the append operator.

  • Or, if it's okay for two publishers to operate independently but you cannot proceed until they have both published, use the zip operator.

And so on; there are many, many more like that.

matt
  • 515,959
  • 87
  • 875
  • 1,141
  • Thank you so much for taking so much time to provide this information. I think really what's happened is my question was poorly worded. I've tried to restate the problem. Your linked example isn't applicable because CNContactStore is safe to access from multiple threads **and** is also has no unsafe intermediate states. But, I'd really like to hear how I'm incorrect/misunderstanding! – Mattie Oct 29 '20 at 14:04
  • I actually thing this question, which was both asked an answered by you, seems more relevant. But, I'm still investigating. https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations – Mattie Oct 29 '20 at 14:09