1

I'm running into a behavior with AsyncStream I don't quite understand.

When I have an actor with a published variable, I can "subscribe" to it via an AsyncPublisher and it behaves as expected, updating only when there is a change in value. If I create an AsyncStream with a synchronous context (but with a potential task retention problem) it also behaves as expected.

The weirdness happens when I try to wrap that publisher in an AsyncStream with an asyncronous context. It starts spamming the view with an update per loop it seems, NOT only when there is a change.

What am I missing about the AsyncStream.init(unfolding:oncancel:) which is causing this behavior?

https://developer.apple.com/documentation/swift/asyncstream/init(unfolding:oncancel:)?

import Foundation
import SwiftUI



actor TestService {
    static let shared = TestService()
    
    @MainActor @Published var counter:Int = 0
    
    @MainActor public func updateCounter(by delta:Int) async {
        counter = counter + delta
    }
    
    public func asyncStream() -> AsyncStream<Int> {
        return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
        
        //() async -> _?
        func unfolding() async -> Int? {
            for await n in $counter.values {
                //print("\(location)")
                return n
            }
            return nil
        }
        
        //optional
        @Sendable func onCancel() -> Void {
            print("confirm counter got canceled")
        }
    }
    
    public func syncStream() -> AsyncStream<Int> {
        AsyncStream { continuation in
            let streamTask = Task {
                for await n in $counter.values {
                    continuation.yield(n)
                }
            }

            continuation.onTermination = { @Sendable _ in
                streamTask.cancel()
                print("StreamTask Canceled")
            }

        }
    }
    
}

struct ContentView: View {
    var body: some View {
        VStack {
            TestActorButton()
            HStack {
                //TestActorViewA() //<-- uncomment at your own risk. 
                TestActorViewB()
                TestActorViewC()
            }
        }
        .padding()
    }
}


struct TestActorButton:View {
    var counter = TestService.shared
    
    
    var body: some View {
        Button("increment counter") {
            Task { await counter.updateCounter(by: 2) }
        }
    }
}


struct TestActorViewA:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Fires constantly.
                for await value in await counter.asyncStream() {
                    print("View A Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewB:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Behaves like one would expect. Fires once per change.
                for await value in await counter.$counter.values {
                    print("View B Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewC:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Also only fires on update
                for await value in await counter.syncStream() {
                    print("View C Value: \(value)")
                    counterVal = value
                }
            }
    }
}

carlynorama
  • 166
  • 7
  • Note: Since the unfolding init is also referred to as being a "pull" style it might be "pulling" the value every time it's done getting it. I will run some tests tomorrow. – carlynorama Sep 27 '22 at 01:37
  • My suspicion is, it's because `unfolding()` will create a new AsyncStream on a single element (the underlying wrapped value in the @Published property) whenever it gets called. When unfolding() gets called on the consumer side it always creates an async stream which has one initial element (from the published property) and returns this one. In constrast, in the other function, the task is created only once, and thus, you create the async stream only once, with the initial value, and then you receive only more values when they change in the published property. – CouchDeveloper Dec 20 '22 at 15:32

1 Answers1

0

The real solution to wrapping a publisher appears to be to stick to the synchronous context initializer and have it cancel it's own task:

public func stream() -> AsyncStream<Int> {
        AsyncStream { continuation in
            let streamTask = Task {
                for await n in $counter.values {
                    //do hard work to transform n 
                    continuation.yield(n)
                }
            }

            continuation.onTermination = { @Sendable _ in
                streamTask.cancel()
                print("StreamTask Canceled")
            }

        }
    }

From what I can tell the "unfolding" style initializer for AsyncStream is simply not a fit for wrapping an AsyncPublisher. The "unfolding" function will "pull" at the published value from within the stream, so the stream will just keep pushing values from that infinite well.

It seems like the "unfolding" style initializer is best used when processing a finite (but potentially very large) list of items, or when generating ones values from scratch... something like:

struct NumberQueuer {
    let numbers:[Int]
    
    public func queueStream() -> AsyncStream<Int> {
        var iterator = AsyncArray(values: numbers).makeAsyncIterator()
        print("Queue called")
        return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
        
        //() async -> _?
        func unfolding() async -> Int? {
            do {
                if let item = try await iterator.next() {
                    return item
                }
            } catch let error {
                print(error.localizedDescription)
            }
            return nil
            
        }
        
        //optional
        @Sendable func onCancel() -> Void {
            print("confirm NumberQueue got canceled")
        }
    }
    
}

public struct AsyncArray<Element>: AsyncSequence, AsyncIteratorProtocol {
    
    let values:[Element]
    let delay:TimeInterval
    
    var currentIndex = -1
    
    public init(values: [Element], delay:TimeInterval = 1) {
        self.values = values
        self.delay = delay
    }
    
    public mutating func next() async throws -> Element? {
        currentIndex += 1
        guard currentIndex < values.count else {
            return nil
        }
        try await Task.sleep(nanoseconds: UInt64(delay * 1E09))
        return values[currentIndex]
    }
    
    public func makeAsyncIterator() -> AsyncArray {
        self
    }
}

One can force the unfolding type to work with an @Published by creating a buffer array that is checked repeatedly. The variable wouldn't actually need to be @Published anymore. This approach has a lot of problems but it can be made to work. If interested, I put it in a repo with a bunch of other AsyncStream examples. https://github.com/carlynorama/StreamPublisherTests

This article was very helpful to sorting this out: https://www.raywenderlich.com/34044359-asyncsequence-asyncstream-tutorial-for-ios

As was this video: https://www.youtube.com/watch?v=UwwKJLrg_0U

carlynorama
  • 166
  • 7