1

Converter is an object that does some work in background. On the UI there is a button that allows users to stop Converter operations. This object works using up to 5 threads. To increase/decrease number of threads I'm using an Actor, that is also used to flag the isRunning property. The skeleton is something like this.

fileprivate let MAX_THREADS = 5
 
actor ConverterData {
    var threads: Int = 0
    var isRunning: Bool = false 

    func incThreads() {
        threads += 1
    }

    fun decThreads() {
        threads -= 1
    }

    func setIsRunning(_ flag: Bool) {
        isRunning = flag
    }
}

class Converter {
    private var converterData = ConverterData()

    func start() {
        Task {
            await converterData.setIsRunning(true)
            while await converterData.isRunning { // here always true
                if await converterData.threads < MAX_THREADS {
                    await converterData.incThreads() // threads increase and the new number is seen by this if
                    // Create a new task
                    ...
                }
                try? await Task.sleep(nanoseconds: 1000000000)
            }
        }
    }

    func stop() {
        Task {
            await converterData.setIsRunning(false)
            // This change is seen here, in this task, but not in the previous task, inside the while.
            print(await converterData.isRunning) // correct: false
        }
    }
}

As shown in code, using comments, the problem seems to be as follows: when I change actor properties from a Task, other tasks do not see that changes, but still see only their own changes.

Maybe I'm missing the real concept behind actors?

Long story Using the NSSpeechSynthetizerDelegate I add an .aiff URL to the actor var aiff: [URL] property each time the TTS finishes writing the spoken file. The while inside the start() method is used to check whether the main loop should continue. Inside this loop it checks if aiff array contains URLs to convert to MP3. This is also the reason I'm using the sleep(nanoseconds:) method inside the start().

When converter detects that all .aiff files are converted to MP3 it should terminate the while inside the start() and terminate the Task. Also the task should be stopped even if a button on the UI is pressed. I'm trying to simplify.

actor ConverterData {
    var threads: Int = 0
    var aiff: [URL] = []
    var isRunning: Bool = false 
    var totalMp3Converted: Int = 0

    // Methods to inc/dec threads
    ...


    // Method to set isRunning, appending
    // URLs to aiff and to inc totalMp3Converted.
    ...
}

class Converter {
    private var converterData = ConvertedData()

    func start() // maybe async {
        Task { // if not async
            // while to check if is running, if so
            // and if current number of aiff-to-mp3 threads is
            // less than MAX_THREADS starts a new 
            // conversion thread (if aiff array contains 
            // at least one aiff URL).

            // Maybe it can be removed? I wouldn't stress 
            // the while too much with continuous checking :-)
            try? await Task.sleep(nanoseconds: 1_000_000_000) 
        }

    func stop() // maybe async {
        Task { // if not async
            await converterData.setIsRunning(false)
            // This should terminate the while in the
            // start() method. 
        }
    }
}

And something like this:

struct ContentView: View {
    private let converter = Converter() 

    var body: some View {
        // This if using the @Rob hint about using 
        // async Converter methods
        Button("Start") {
            Task {
                await converter.start()
            }
        }

        Button("Stop") {
            Task {
                await converter.stop() 
            }
        }
    }
}

Hope this is clear enough!

Altair Jones
  • 834
  • 1
  • 7
  • 20
  • 1
    Unrelated, but you should be wary of unstructured concurrency, where `start` and `stop` launch new top level tasks on the current actor (which is what `Task {…}` does). They should just be `async` functions, and the caller can can wrap that in a `Task` if it needs/wants (which it can cancel if it wants). But you want to avoid launching orphaned unstructured concurrency. – Rob May 18 '23 at 17:15
  • @Rob thanks for your reply. I edited my question to add more context, hoping it will be clearer. Meanwhile I'm studying your hint about using an async converter inside an external Task. – Altair Jones May 18 '23 at 19:03
  • The `while` loop is not the right solution, IMHO. I'd suggest you stop trying to tell what what you were trying to do (with the `while` and `sleep`), and instead focus on what you need it to do. If you just want it to process them as they come in, use `AsyncChannel`. If you want to constrain the degree of concurrency, use task group with `next`. If you want to process them sequentially, that's even easier. – Rob May 18 '23 at 20:21
  • @Rob thanks again. I'm really new to these "new" APIs. In earlier versions of my app I used to use dispatch_queue. I will trying to study AsyncChanel. It seems to me the right solution (dropping the min threads requirement). May I ask you a basic example to start with? – Altair Jones May 18 '23 at 20:49
  • See latter part of [this answer](https://stackoverflow.com/a/73072799/1271826) where I’ve got a download manager that will perform a max of four requests at a time, but for which I send requests to the channel, as needed. That is effectively the solution I included in the second half of my [answer](https://stackoverflow.com/a/76283315/1271826), below. – Rob May 19 '23 at 01:22
  • @Rob really thank you. I missed the second part of your answer yesterday night. Maybe I was a little tired. OK I will give it a try. It seems to me the right method. – Altair Jones May 19 '23 at 04:32

1 Answers1

0

This code works fine. The property does get updated. See https://gist.github.com/robertmryan/82107541c5ec227ef272da2edc0531f0.

Now, if the omitted code, “create a new task”, is (a) isolated to the current actor; and (b) doing something slow and synchronous, then that could result in the behavior you described. But it works for me.


That having been said, spinning (even with the pause in the loop) is an anti-pattern. If you find yourself writing code that spins, that is a red flag, an indication that you can optimized this.

If you want to run no more than MAX_THREADS tasks concurrently, I would use a task group, inserting a group.next() call after you reach a certain level. E.g.,

class Converter {
    func process(_ items: [Item]) async {
        await withTaskGroup(of: Void.self) { group in
            for (index, item) in items.enumerated() {
                if index >= MAX_THREADS {
                    await group.next()
                }
                
                group.addTaskUnlessCancelled { await self.convert(item) }
            }
        }
    }
    
    nonisolated func convert(_ item: Item) async { … }
}

And, if the concern is that you don’t have the full array of items when you start this process, then use an AsyncChannel, of the Swift Async Algorithms, and then you can do a for-await-in. E.g.,

actor Converter {
    private let channel = AsyncChannel<Item>()

    /// Monitor channel for new items
    ///
    /// Run no more than `MAX_THREADS` at a time.

    func monitorChannel() async {
        await withTaskGroup(of: Void.self) { group in
            var index = 0
            for await item in channel {
                index += 1
                if index > MAX_THREADS {
                    await group.next()
                }

                group.addTaskUnlessCancelled { await self.convert(item) }
            }
        }
    }

    /// Append additional item to channel

    func append(_ item: Item) async {
        await channel.send(item)
    }
}

But if you find yourself spinning or keeping track of a count of active processes, then you may want to refactor/simplify that, as shown above.

Rob
  • 415,655
  • 72
  • 787
  • 1,044