9

Let say I have this code

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}

This would print

walk start
quack start
fly start
walk end
quack end
fly end

which I understand and expected. But what if I want those 3 Tasks run sequentially? Let say each Task is created by user pressing a button. I want the tasks to queue up in the background and run one by one. Is there any thing like you can queue up DispatchWorkItem in a DispatchQueue, but a Task version?


Edit:

I came up with a solution, but I am not sure if this is a good way to implement it. As this implementation potentially create many layer of cascaded Task, I wonder if there would be risk of stack overflow or memory leaks?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}
Ricky Mo
  • 6,285
  • 1
  • 14
  • 30
  • Just put the 3 await in the same task – Ptit Xav Mar 23 '22 at 12:00
  • @PtitXav This is a simplified example. Please assume those 3 tasks are created separately in different part of the program, e.g. By user pressing buttons. – Ricky Mo Mar 23 '22 at 12:02
  • If you put tasks in same dispatch queue these task should process in order (cf [apple](https://developer.apple.com/documentation/dispatch/dispatchqueue) – Ptit Xav Mar 23 '22 at 13:10
  • @PtitXav Tasks do not serialize on DispatchQueues the same way blocks do. They can be interrupted any time they `await` and other Tasks may be scheduled on the same queue. – Rob Napier Mar 23 '22 at 13:57
  • 1
    @RobNapier - I get why we need reentrancy in our actors, but I really hope they proceed with the [non-reentrancy “future direction”](https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#non-reentrancy) (or some other control over the degree of concurrency other than just the cooperative thread pool ceiling) at some point. – Rob Mar 26 '22 at 05:46
  • 2
    @Rob Absolutely. I'm just saying they don't. I'm currently contemplating whether the new asyncsequence algorithms will help here. We really do need a something like TaskQueue below (a traditional "actor mailbox" which is what most people who are used to actors expect; Swift's take on actors is very interesting and I think really innovative, but it's also deeply unlike every other use of the word "actor"). I think a data structure like TaskQueue is better than more magical annotations, though. It's hard to reason about whether code is correct when it could be annotated far away. – Rob Napier Mar 26 '22 at 15:30

4 Answers4

10

I once was a proponent of the unstructured task approach, where each would await the prior one. In retrospect, this feels a bit brittle to me. Increasingly (with credit to Rob Napier for nudging me in this direction), I now use asynchronous sequences, specifically AsyncChannel from Apple’s swift-async-algorithms. I think it is a more robust behavior and is more consistent with the asynchronous sequences of modern Swift concurrency.

Before we come to your example, consider this serial downloader, where we have one process (the user button clicking) send URL objects to another process monitoring the channel for URLs in a for-await-in loop:

struct DownloadView: View {
    @StateObject var viewModel = DownloadViewModel()

    var body: some View {
        VStack {
            Button("1") { Task { await viewModel.appendDownload(1) } }
            Button("2") { Task { await viewModel.appendDownload(2) } }
            Button("3") { Task { await viewModel.appendDownload(3) } }
        }
        .task {
            await viewModel.monitorDownloadRequests()
        }
    }
}

@MainActor
class DownloadViewModel: ObservableObject {
    private let session: URLSession = …
    private let baseUrl: URL = …
    private let folder: URL = …
    private let channel = AsyncChannel<URL>()   // note, we're sending URLs on this channel

    func monitorDownloadRequests() async {
        for await url in channel {
            await download(url)
        }
    }

    func appendDownload(_ index: Int) async {
        let url = baseUrl.appending(component: "\(index).jpg")
        await channel.send(url)
    }

    func download(_ url: URL) async {
        do {
            let (location, _) = try await session.download(from: url)
            let fileUrl = folder.appending(component: url.lastPathComponent)
            try? FileManager.default.removeItem(at: fileUrl)
            try FileManager.default.moveItem(at: location, to: fileUrl)
        } catch {
            print(error)
        }
    }
}

We start monitorDownloadRequests and then append download requests to the channel.

This performs the requests serially (because monitorDownloadRequests has a for-await loop). E.g., in Instruments’ “Points of Interest” tool, I have added some Ⓢ signposts where I clicked these buttons, and show intervals where the requests happen, and you can see that these three requests happen sequentially.

enter image description here

But the wonderful thing about channels is that they offer serial behaviors without introducing the problems of unstructured concurrency. They also handle cancelation automatically (if you want that behavior). If you cancel the for-await-in loop (which the .task {…} view modifier does for us automatically in SwiftUI when the view is dismissed). If you have a bunch of unstructured concurrency, with one Task awaiting the prior one, handling cancelation gets messy quickly.


Now, in your case, you are asking about a more general queue, where you can await tasks. Well, you can have an AsyncChannel of closures:

typealias AsyncClosure = () async -> Void

let channel = AsyncChannel<AsyncClosure>()

E.g.:

typealias AsyncClosure = () async -> Void

struct ExperimentView: View {
    @StateObject var viewModel = ExperimentViewModel()

    var body: some View {
        VStack {
            Button("Red")   { Task { await viewModel.addRed() } }
            Button("Green") { Task { await viewModel.addGreen() } }
            Button("Blue")  { Task { await viewModel.addBlue() } }
        }
        .task {
            await viewModel.monitorChannel()
        }
    }
}

@MainActor
class ExperimentViewModel: ObservableObject {
    let channel = AsyncChannel<AsyncClosure>()

    func monitorChannel() async {
        for await task in channel {
            await task()
        }
    }

    func addRed() async {
        await channel.send { await self.red() }
    }

    func addGreen() async {
        await channel.send { await self.green() }
    }

    func addBlue() async {
        await channel.send { await self.blue() }
    }

    func red() async { … }

    func green() async { … }

    func blue() async { … }
}

That yields:

enter image description here

Here again, I am using Instruments to visualize what is going on. I clicked the “red”, “green”, and “blue” buttons quickly, in succession, twice. I then watched the six corresponding intervals for these three second tasks. I then repeated that six-click process a second time, but this time I dismissed the view in question before they finished, mid-way through the green task of the second series of button taps, illustrating the seamless cancelation capabilities of AsyncChannel (and asynchronous sequences in general).

Now, I hope you forgive me, as I omitted the code to create all of these “Points of Interest” signposts and intervals, as it adds a lot of kruft that really is not relevant to the question at hand (but see this if you are interested). But hopefully these visualizations help illustrate what is going on.

The take-home message is that AsyncChannel (and its sibling AsyncThrowingChannel) is a great way to remain within structured concurrency, but get serial (or constrained behavior, like shown at the end of this answer) that we used to get with queues, but with asynchronous tasks.

I must confess that this latter AsyncClosure example, while it hopefully answers your question, feels a little forced to my eye. I have been using AsyncChannel for a few months now, and I personally always have a more concrete object being handled by the channel (e.g., URLs, GPS locations, image identifiers, etc.). This example with closures feels like it is trying just a little too hard to reproduce old fashioned dispatch/operation queue behaviors.

Rob
  • 415,655
  • 72
  • 787
  • 1,044
  • Thank you for suggesting the swift-async-alogrithm pacakge! I have been doing similar implementation with kotlin coroutine when working on android. I have also noticed that swift got the `AsyncSequence` and `AsyncStream` API and started translating my kotlin code to swift. But swift lack the very convenient `Channel` API like kotlin does so I wrote it with `AsyncStream`. `AsyncChannel` seems like the missing piece I have been looking for! I felt like I was almost going to write my own `AsyncChannel` from scratch. No idea that it resides in a package that need to be manually added. Thank you. – Ricky Mo Mar 14 '23 at 08:23
  • What if you had to return something from the closure? (for example `let result = await viewModel.addRed()`?) Would `AsyncChannel` still work in this case? – aheze Jul 14 '23 at 17:04
  • Sadly, no. As the [documentation](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md#proposed-solution) says, “Each value sent by the channel will await the *consumption* of that value by iteration” (emphasis added). So awaits the “consumption” of that value, but not whatever the routine subsequently does with that consumed value. In short, `send` cannot (and does not) `return` anything. If you're returning stuff, you would need some other mechanism for that, e.g., another asynchronous sequence (!) of emitted values. – Rob Jul 14 '23 at 17:40
  • I confess that when I first stumbled across this, I had a knee-jerk “what were they thinking” reaction, but after noodling on what the implementation of a bidirectional channel might look like (esp on the consumer side), I can see why they did it that way. It’s not immediately obvious how a bidirectional channel would fit nicely into the `AsyncSequence` pattern. Sure, we could make something with some completion handler closure rendition of `next`, but that gets pretty ugly, pretty quickly. – Rob Jul 14 '23 at 17:52
4

Update:

For future people who find this post useful, I have created a swift package with better implementation and added support for queuing up AsyncThrowingStream too.

https://github.com/rickymohk/SwiftTaskQueue


Here is my updated implementation which I think is safer than the one I posted in the question. The TaskQueueActor part does all of the job, I wrap it with an outer class just to make it cleaner when calling from a non-async context.

class TaskQueue{
    
    private actor TaskQueueActor{
        private var blocks : [() async -> Void] = []
        private var currentTask : Task<Void,Never>? = nil
        
        func addBlock(block:@escaping () async -> Void){
            blocks.append(block)
            next()
        }
        
        func next()
        {
            if(currentTask != nil) {
                return
            }
            if(!blocks.isEmpty)
            {
                let block = blocks.removeFirst()
                currentTask = Task{
                    await block()
                    currentTask = nil
                    next()
                }
            }
        }
    }
    private let taskQueueActor = TaskQueueActor()
    
    func dispatch(block:@escaping () async ->Void){
        Task{
            await taskQueueActor.addBlock(block: block)
        }
    }
}
Ricky Mo
  • 6,285
  • 1
  • 14
  • 30
4

I found this one on Github: https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift

via

https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18

import Foundation

public actor TaskQueue {
    private let concurrency: Int
    private var running: Int = 0
    private var queue = [CheckedContinuation<Void, Error>]()

    public init(concurrency: Int) {
        self.concurrency = concurrency
    }

    deinit {
        for continuation in queue {
            continuation.resume(throwing: CancellationError())
        }
    }

    public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
        try Task.checkCancellation()

        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
            queue.append(continuation)
            tryRunEnqueued()
        }

        defer {
            running -= 1
            tryRunEnqueued()
        }
        try Task.checkCancellation()
        return try await operation()
    }

    private func tryRunEnqueued() {
        guard !queue.isEmpty else { return }
        guard running < concurrency else { return }

        running += 1
        let continuation = queue.removeFirst()
        continuation.resume()
    }
}

Seems to work

@StateObject var taskQueue = TaskQueue(concurrency: 1)

            .task {
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 1")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 2")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 3")
                }
malhal
  • 26,330
  • 7
  • 115
  • 133
0

one more way is to use SwiftPM package AsyncFifo https://github.com/sugarbaron/async-fifo . it allows you to enqueue your tasks like with DispatchQueue, but in async/await way:

let background: AsyncFifo = .init()

background.enqueue { await someFunc1() }
background.enqueue { someSynchFunc() }
background.enqueue { await whatever() }

// executes sequentally
// someFunc1() -> then someSycnhFunc() -> then whatever()

or you can just add AsyncFifo source code in your project: (wall of code incoming)



import Foundation

// MARK: constructor
public extension Async {
    
    final class Fifo {
        
        private var queue: [Scheduled]
        private let access: NSRecursiveLock
        private var executing: Bool
        
        public init() {
            self.queue = [ ]
            self.access = NSRecursiveLock()
            self.executing = false
        }
        
    }
    
}

// MARK: interface
public extension Async.Fifo {
    
    func enqueue(_ coroutine: @Sendable @escaping () async throws -> Void,
                 catch: @escaping (Error) -> Void = { print("[x][Async.Fifo] coroutine throws: \($0)") }) {
        schedule(coroutine, `catch`)
        inBackground { [weak self] in await self?.executeSequentally() }
    }
    
    var isBusy: Bool {
        access.lock()
        let isBusy: Bool = executing || !(queue.isEmpty)
        access.unlock()
        return isBusy
    }
    
    var queueSize: Int {
        access.lock()
        let size: Int = queue.count + (executing ? 1 : 0)
        access.unlock()
        return size
    }
    
    func cancelAll() {
        access.lock()
        queue = [ ]
        access.unlock()
    }
    
}

// MARK: tools
private extension Async.Fifo {
    
    func schedule(_ coroutine: @Sendable @escaping () async throws -> Void, _ catch: @escaping (Error) -> Void) {
        access.lock()
        queue.append((coroutine, `catch`))
        access.unlock()
    }
    
    func executeSequentally() async {
        if alreadyExecuting { return }
        while let next: Scheduled {
            do    { try await next.coroutine() }
            catch { next.catch(error) }
        }
    }
    
    var next: Scheduled? {
        access.lock()
        if queue.isEmpty { executing = false; access.unlock(); return nil }
        let next: Scheduled = queue.removeFirst()
        access.unlock()
        return next
    }
    
    var alreadyExecuting: Bool {
        access.lock()
        let executing = self.executing
        if executing == false { self.executing = true }
        access.unlock()
        return executing
    }
    
    typealias Scheduled = (coroutine: () async throws -> Void, catch: (Error) -> Void)
    
}

/// namespace class
public final class Async { }

public extension Async {  typealias Task = _Concurrency.Task }

@inlinable public func concurrent<T>(function: String = #function, _ callback: (CheckedContinuation<T, Error>) -> Void)
async throws -> T {
    try await withCheckedThrowingContinuation(function: function, callback)
}

@discardableResult
public func inBackground<T:Sendable>(_ coroutine: @Sendable @escaping () async throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached(priority: .low, operation: coroutine)
}

@discardableResult
public func onMain<T:Sendable>(_ coroutine: @MainActor @Sendable @escaping () throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached { try await MainActor.run { try coroutine() } }
}

@discardableResult
public func onMain<T:Sendable>(after delay: TimeInterval, _ coroutine: @MainActor @Sendable @escaping () throws -> T)
-> Async.Task<T, Error> {
    Async.Task.detached { await idle(delay); return try await MainActor.run { try coroutine() } }
}

public func idle(_ duration: TimeInterval) async {
    do    { try await Task.sleep(nanoseconds: UInt64(duration * 1e9)) }
    catch { print("[x][Async] sleep interrupted: \(error)") }
}
sugar baron
  • 165
  • 8