9

I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.

For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1).

Should we use an actor somewhere to make sure the methods are serially called?

I thought about an implementation like this but I’m not yet really comfortable with that:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}
alpennec
  • 1,864
  • 3
  • 18
  • 25
  • In Swift Concurrency a sequential execution is the normal case unless you use `async let` or a detached task. In a task group the iterations can be performed concurrently, but the result is *awaited* in order. In your case an `AsyncSequence` is the right way. – vadian Dec 31 '21 at 09:11
  • @vadian So you mean that the code in the second project from Apple using Task (https://developer.apple.com/documentation/coredata/loading_and_displaying_a_large_data_feed) is executing Tasks in a serial way? There are no risk of task scheduled later being executed before another task already scheduled? – alpennec Dec 31 '21 at 09:44
  • Yes, as the name `AsyncSequence` implies it's a sequence and a sequence is executed *sequential*ly. But feel free to try it out. – vadian Dec 31 '21 at 09:52
  • Hum Apple is not using the AsyncSequence in their code (it's my proposal). But even if they were, it means the notifications are received sequentially, but when I call the async methods in the Task, will they be executed sequentially too? I'm a bit confused by the ways the different Tasks will handled actually. – alpennec Dec 31 '21 at 10:30
  • "In a task group the iterations can be performed concurrently, but the result is awaited in order" False. The results can arrive in _any_ order. – matt Dec 31 '21 at 12:25
  • "Should we use an actor somewhere to make sure the methods are serially called?" That is certainly one of the main purposes of actors, yes. – matt Dec 31 '21 at 12:26
  • Thanks @matt for your precisions! Do you have any thought regarding my code proposal for this actor? – alpennec Dec 31 '21 at 12:28
  • I have no idea because I don't know what `processRemoteStoreChange()` really does. But if the goal is to serialize operations but not on the main thread, that's an actor. – matt Dec 31 '21 at 12:40
  • 1
    @matt You're right, I apologize. TaskGroup returns the items in order of completion. But AsyncSequence returns the items in sequential order. – vadian Dec 31 '21 at 16:36
  • @Rob No they don't. That's why you can't simply append the results to an array as they arrive. You lose all association with the array you were looping through to start with. – matt Dec 31 '21 at 16:40
  • Correct. We need to do the same silliness to reassemble the results in the original order. – Rob Dec 31 '21 at 17:03
  • So how to make sure that only one async method executes at the same time when added from a AsyncSequence, like notifications, without missing any item from the AsyncSequence ? In my case, it’s possible to have a queue of upcoming Async method calls started from the AsyncSequence, a little bit like a buffer. E.g 10 notifications received in 1 second, they all call the Async method but only one executes. When the first completes the 2nd starts. And so on. Possibly, new calls arrives when all calls are not executed yet but they add to the queue. – alpennec Dec 31 '21 at 20:34
  • “So how to make sure that only one async method executes at the same time?” ... But is it really `async`? Your operation queue example is actually calling `processPersistentHistory`, which obviously is not an asynchronous method. I mention it because the desired behavior is easy to achieve with `actor` if the method in question is synchronous. (FWIW, the same is true for operations, too, which get far more complicated when you attempt wrap an inherently asynchronous process within an `Operation` subclass.) – Rob Jan 05 '22 at 01:41

1 Answers1

12

Below, in my original answer, I answer the general question of how to achieve sequential behavior from independent tasks within Swift concurrency.

But, you are asking a more specific question, namely, how to get serial behavior from an asynchronous sequence of events. If you have an AsyncSequence, such as notifications, then the for-await-in approach you contemplate at the end of your answer is a great solution:

notificationListenerTask = Task {
    let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
    for await _ in notifications {
        await self.storeListener?.processRemoteStoreChange()
    }
}

Because you await within the loop, it will not get to the next iteration of the notifications AsyncSequence until the prior processRemoteStoreChange returns and execution of the loop continues.

Bottom line, AsyncSequence (whether notifications or your own AsyncStream or AsyncChannel) are an excellent way to get serial behavior from an asynchronous series of events. WWDC 2021 video Meet AsyncSequence is a great primer on asynchronous sequences for those unfamiliar with the AsyncSequence protocol.


In my original answer, below, I tackle the more general question of getting serial behavior from a series of independent Swift concurrency tasks:


If you want to get the behavior of an OperationQueue with a maxConcurrentOperationCount of 1 (a ”serial” operation queue), one can achieve that with an actor.

There are two patterns that you will see with a serial OperationQueue:

  1. The operations in the queue are, themselves, synchronous.

    If you are using the standard OperationQueue (i.e., you have not subclassed Operation that does manual KVO for isFinished, etc.), a simple actor achieves what we want. An actor will prevent concurrent execution.

    The key here, though, that this only works with synchronous methods (i.e., those methods that do not have await suspension points).

  2. The operations in the queue are asynchronous.

    One of the more advanced use-cases of operation queues is to handle dependencies between tasks that are, themselves, asynchronous. This is a more complicated scenario in operation queues, requiring a custom Operation subclass in which you manually handle the KVO of isFinished, etc. (See this answer for an example of that pattern.)

    The challenge in doing this with Swift concurrency is that actors are reentrant (see reentrancy discussion in SE-0306. If the actor’s method is asynchronous (i.e., with async-await) that introduces suspension points, i.e., where an await in one call will allow another async method to run on that actor.

    To achieve serial execution between separate async methods, you have a couple of options:


Consider the following (which uses OS signposts so that I can graphically illustrate the behavior in Instruments):

import os.signpost

private let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)

class ViewController: UIViewController {

    let example = Example()
    let taskSerializer = SerialTasks<Void>()

    @IBAction func didTapSync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        startSynchronous()
    }

    @IBAction func didTapAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startAsynchronous() }
    }

    @IBAction func didTapSerializedAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startSerializedAsynchronous() }
    }

    func startSynchronous() {
        Task {
            await example.synchronousExample("1. synchronous")
        }
    }

    func startAsynchronous() async throws {
        try await example.asynchronousExample("2. asynchronous")
    }

    func startSerializedAsynchronous() async throws {
        try await taskSerializer.add {
            try await self.example.asynchronousExample("3. serial async")
        }
    }
}

actor Example {
    func asynchronousExample(_ name: StaticString) async throws {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        try await Task.sleep(for: .seconds(2))
    }

    func synchronousExample(_ name: StaticString) {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        Thread.sleep(forTimeInterval: 2)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
        let task = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
        previousTask = task
        return try await task.value
    }
}

With synchronous tasks (scenario 1), startSynchronous, is the simplest. Just call the synchronous method of the actor and you get serial execution.

With asynchronous tasks (scenario 2), startAsynchronous, if you have await suspension points, you lose sequential behaviors due to actor reentrancy.

But you can refine that asynchronous task pattern (scenario 3), by having an actor, SerialTasks in the above code, that keeps track of the previous task, awaiting it before starting the next task. A subtle point is that the add method is, itself, synchronous (although the closure it takes is asynchronous). This avoids subtle races if you add multiple tasks.

Running the above in Instruments, we can graphically see the execution, with signposts where tasks were initiated, and intervals showing when the tasks execute:

enter image description here

In short, if your actor is performing only synchronous tasks (which is your case), then the actor yields maxConcurrentOperationCount = 1 sort of behavior automatically. If the tasks are asynchronous, you simply need to await the prior tasks before starting the next one.

Rob
  • 415,655
  • 72
  • 787
  • 1,044
  • Thanks @Rob for this nice explanation. It consolidate my knowledge about Actors. The method ‘processPersistentHistory’ is asynchronous because it needs to be run on an NSManagedObjectContext thread. So we need to ‘await context.perform { }’ in order to make sure the perform body is executed on the correct thread. This is of course if we want to use the new async/await, because I still ça use the performAndWait method to run the body synchronously. – alpennec Jan 05 '22 at 05:39
  • Do not conflate the fact that you run it on another thread (e.g., on your operation queue) with the question of whether `processPersistentHistory` is, itself, asynchronous. The fact that it doesn’t have a completion handler and that you did a simple `addOperation`, suggests that it is a synchronous method that you’re simply running asynchronously on a background thread. But if the method, itself, was asynchronous, the `addOperation` example you shared would not have worked. – Rob Jan 05 '22 at 06:02
  • If you want more details on the `processPersistentHistory` method, it's more or less what can be found in this article: https://www.avanderlee.com/swift/persistent-history-tracking-core-data/. You'll see that it creates a new Core Data backgroundContext and perform some operations but in a sync way (`.performAndWait`). With the new Swift concurrency model & Core Data, this `performAndWait` is now `await context.perform {`. But if we use `Task { await context.perform { ... sync work ... } }`, it's still not clear to me how to serialize these as a Task can run on an arbitrary thread. – alpennec Jan 05 '22 at 09:15
  • I have revised the above answer, adding scenario three, showing how you can get serial behavior between asynchronous Tasks on the actor. – Rob May 08 '22 at 17:44
  • Hi @Rob, could you please provide an example of using the `SerialTasks.add()` method for adding `async` methods that actually return a value, please? Something where the declaration is like `SerialTasks` – Sagar D Dec 15 '22 at 09:32
  • 1
    @SagarD - I've modified `SerialTasks` to asynchronously return the closure’s result. See https://github.com/robertmryan/SerialTaskDemo.git for example. – Rob Dec 19 '22 at 00:15
  • FWIW, I increasingly use asynchronous sequences (esp [`AsyncChannel`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md)) to get both serial and constrained actor reentrancy behaviors. E.g., see the download managers in the latter part of [this answer](https://stackoverflow.com/a/73072799/1271826). – Rob Mar 08 '23 at 21:34