1

I have a class called QueryObserver that can produce multiple results over time, given back as callbacks (closures). You use it like this:

let observer = QueryObserver<ModelType>(query: query) { result in
  switch result {
  case .success(let value):
    print("result: \(value)")
  case .failure(let error):
    print("error: \(error)")
  }
}

(QueryObserver is actually a wrapper around Firebase Firestore's unwieldy query.addSnapshotListener functionality, in case you were wondering. Using modern Result type instead of a callback with multiple optional parameters.)

In an older project I am using ReactiveKit and have an extension that turns all this into a Signal, like so:

extension QueryObserver {
  public static func asSignal(query: Query) -> Signal<[T], Error> {
    return Signal { observer in
      let queryObserver = QueryObserver<T>(query: query) { result in
        switch result {
        case .success(let value):
          observer.receive(value)
        case .failure(let error):
          if let firestoreError = error as? FirestoreError, case .noSnapshot = firestoreError {
            observer.receive([])
          } else {
            observer.receive(completion: .failure(error))
          }
        }
      }

      return BlockDisposable {
        queryObserver.stopListening()
      }
    }
  }
}

In a brand new project though, I am using Combine and am trying to rewrite this. So far as I have managed to write this, but it doesn't work. Which makes sense: the observer is not retained by anything so it's immediately released, and nothing happens.

extension QueryObserver {
  public static func asSignal(query: Query) -> AnyPublisher<[T], Error> {
    let signal = PassthroughSubject<[T], Error>()

    let observer = QueryObserver<T>(query: query) { result in
      switch result {
      case .success(let value):
        print("SUCCESS!")
        signal.send(value)
      case .failure(let error):
        if let firestoreError = error as? FirestoreError, case .noSnapshot = firestoreError {
          signal.send([])
        } else {
          signal.send(completion: .failure(error))
        }
      }
    }

    return signal.eraseToAnyPublisher()
  }
}

How do I make the Combine version work? How can I wrap existing async code? The only examples I found used Future for one-off callbacks, but I am dealing with multiple values over time.

Basically I am looking for the ReactiveKit-to-Combine version of this.

Kevin Renskers
  • 5,156
  • 4
  • 47
  • 95
  • 1
    You have to keep your publisher in some stored property (say private member) as well as any subscriber (say as shown [here](https://stackoverflow.com/a/59573797/12299030) ). – Asperi Jan 05 '20 at 09:44

1 Answers1

0

Check out https://github.com/DeclarativeHub/ReactiveKit/issues/251#issuecomment-575907641 for a handy Combine version of a Signal, used like this:

let signal = Signal<Int, TestError> { subscriber in
    subscriber.receive(1)
    subscriber.receive(2)
    subscriber.receive(completion: .finished)
    return Combine.AnyCancellable {
        print("Cancelled")
    }
}
Kevin Renskers
  • 5,156
  • 4
  • 47
  • 95