1

I may be going about this the wrong way, but I have a function with which I want to emit multiple values over time. But I don’t want it to start emitting until something is subscribed to that object. I’m coming to combine from RxSwift, so I’m basically trying to duplicated Observable.create() in the RxSwift world. The closest I have found is returning a Future, but futures only succeed or fail (so they are basically like a Single in RxSwift.)

Is there some fundamental thing I am missing here? My end goal is to make a function that processes a video file and emits progress events until it completes, then emits a URL for the completed file.

Jason Clardy
  • 239
  • 3
  • 6

2 Answers2

5

Generally you can use a PassthroughSubject to publish custom outputs. You can wrap a PassthroughSubject (or multiple PassthroughSubjects) in your own implementation of Publisher to ensure that only your process can send events through the subject.

Let's mock a VideoFrame type and some input frames for example purposes:

typealias VideoFrame = String
let inputFrames: [VideoFrame] = ["a", "b", "c"]

Now we want to write a function that synchronously processes these frames. Our function should report progress somehow, and at the end, it should return the output frames. To report progress, our function will take a PassthroughSubject<Double, Never>, and send its progress (as a fraction from 0 to 1) to the subject:

func process(_ inputFrames: [VideoFrame], progress: PassthroughSubject<Double, Never>) -> [VideoFrame] {
    var outputFrames: [VideoFrame] = []
    for input in inputFrames {
        progress.send(Double(outputFrames.count) / Double(inputFrames.count))
        outputFrames.append("output for \(input)")
    }
    return outputFrames
}

Okay, so now we want to turn this into a publisher. The publisher needs to output both progress and a final result. So we'll use this enum as its output:

public enum ProgressEvent<Value> {
    case progress(Double)
    case done(Value)
}

Now we can define our Publisher type. Let's call it SyncPublisher, because when it receives a Subscriber, it immediately (synchronously) performs its entire computation.

public struct SyncPublisher<Value>: Publisher {
    public init(_ run: @escaping (PassthroughSubject<Double, Never>) throws -> Value) {
        self.run = run
    }

    public var run: (PassthroughSubject<Double, Never>) throws -> Value

    public typealias Output = ProgressEvent<Value>
    public typealias Failure = Error

    public func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let progressSubject = PassthroughSubject<Double, Never>()
        let doneSubject = PassthroughSubject<ProgressEvent<Value>, Error>()
        progressSubject
            .setFailureType(to: Error.self)
            .map { ProgressEvent<Value>.progress($0) }
            .append(doneSubject)
            .subscribe(subscriber)
        do {
            let value = try run(progressSubject)
            progressSubject.send(completion: .finished)
            doneSubject.send(.done(value))
            doneSubject.send(completion: .finished)
        } catch {
            progressSubject.send(completion: .finished)
            doneSubject.send(completion: .failure(error))
        }
    }
}

Now we can turn our process(_:progress:) function into a SyncPublisher like this:

let inputFrames: [VideoFrame] = ["a", "b", "c"]
let pub = SyncPublisher<[VideoFrame]> { process(inputFrames, progress: $0) }

The run closure is { process(inputFrames, progress: $0) }. Remember that $0 here is a PassthroughSubject<Double, Never>, exactly what process(_:progress:) wants as its second argument.

When we subscribe to this pub, it will first create two subjects. One subject is the progress subject and gets passed to the closure. We'll use the other subject to publish either the final result and a .finished completion, or just a .failure completion if the run closure throws an error.

The reason we use two separate subjects is because it ensures that our publisher is well-behaved. If the run closure returns normally, the publisher publishes zero or more progress reports, followed by a single result, followed by .finished. If the run closure throws an error, the publisher publishes zero or more progress reports, followed by a .failed. There is no way for the run closure to make the publisher emit multiple results, or emit more progress reports after emitting the result.

At last, we can subscribe to pub to see if it works properly:

pub
    .sink(
        receiveCompletion: { print("completion: \($0)") },
        receiveValue: { print("output: \($0)") })

Here's the output:

output: progress(0.0)
output: progress(0.3333333333333333)
output: progress(0.6666666666666666)
output: done(["output for a", "output for b", "output for c"])
completion: finished
rob mayoff
  • 375,296
  • 67
  • 796
  • 848
  • For the final result, would we be able to use a Future instead? – Fawkes Jun 13 '21 at 19:43
  • Also we do wrap a passthrough subject in our own Publisher but is the only other way to avoid having publicly exposed send functionality creating a fully made custom publisher? – Fawkes Jun 13 '21 at 19:44
  • I think you could use a `Future` instead of `doneSubject` in this case. – rob mayoff Jun 13 '21 at 23:46
0

The following extension to AnyPublisher replicates Observable.create semantics by composing a PassthroughSubject. This includes cancellation semantics.


AnyPublisher.create() Swift 5.6 Extension

public extension AnyPublisher {
    
    static func create<Output, Failure>(_ subscribe: @escaping (AnySubscriber<Output, Failure>) -> AnyCancellable) -> AnyPublisher<Output, Failure> {
        
        let passthroughSubject = PassthroughSubject<Output, Failure>()
        var cancellable: AnyCancellable?
        
        return passthroughSubject
            .handleEvents(receiveSubscription: { subscription in

                let subscriber = AnySubscriber<Output, Failure> { subscription in

                } receiveValue: { input in
                    passthroughSubject.send(input)
                    return .unlimited
                } receiveCompletion: { completion in
                    passthroughSubject.send(completion: completion)
                }
                
                cancellable = subscribe(subscriber)
                
            }, receiveCompletion: { completion in
                
            }, receiveCancel: {
                cancellable?.cancel()
            })
            .eraseToAnyPublisher()
        
    }
    
}

Usage

func doSomething() -> AnyPublisher<Int, Error> {
    
    return AnyPublisher<Int, Error>.create { subscriber in
        
        // Imperative implementation of doing something can call subscriber as follows
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
        // subscriber.receive(completion: .failure(myError))
        
        return AnyCancellable {
            // Imperative cancellation implementation
        }
    }
    
}
Brody Robertson
  • 8,506
  • 2
  • 47
  • 42