8

If you use Combine for network requests with URLSession, then you need to save the Subscription (aka, the AnyCancellable) - otherwise it gets immediately deallocated, which cancels the network request. Later, when the network response has been processed, you want to deallocate the subscription, because keeping it around would be a waste of memory.

Below is some code that does this. It's kind of awkward, and it may not even be correct. I can imagine a race condition where network request could start and complete on another thread before sub is set to the non-nil value.

Is there a nicer way to do this?

class SomeThing {
    var subs = Set<AnyCancellable>()
    func sendNetworkRequest() {
        var request: URLRequest = ...
        var sub: AnyCancellable? = nil            
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .sink(
                receiveCompletion: { completion in                
                    self.subs.remove(sub!)
                }, 
                receiveValue: { response in ... }
            }
        subs.insert(sub!)
Rob N
  • 15,024
  • 17
  • 92
  • 165

3 Answers3

16

I call this situation a one-shot subscriber. The idea is that, because a data task publisher publishes only once, you know for a fact that it is safe to destroy the pipeline after you receive your single value and/or completion (error).

Here's a technique I like to use. First, here's the head of the pipeline:

let url = URL(string:"https://www.apeth.com/pep/manny.jpg")!
let pub : AnyPublisher<UIImage?,Never> =
    URLSession.shared.dataTaskPublisher(for: url)
        .map {$0.data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:$0) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()

Now comes the interesting part. Watch closely:

var cancellable: AnyCancellable? // 1
cancellable = pub.sink(receiveCompletion: {_ in // 2
    cancellable?.cancel() // 3
}) { image in
    self.imageView.image = image
}

Do you see what I did there? Perhaps not, so I'll explain it:

  1. First, I declare a local AnyCancellable variable; for reasons having to do with the rules of Swift syntax, this needs to be an Optional.

  2. Then, I create my subscriber and set my AnyCancellable variable to that subscriber. Again, for reasons having to do with the rules of Swift syntax, my subscriber needs to be a Sink.

  3. Finally, in the subscriber itself, I cancel the AnyCancellable when I receive the completion.

The cancellation in the third step actually does two things quite apart from calling cancel() — things having to do with memory management:

  • By referring to cancellable inside the asynchronous completion function of the Sink, I keep cancellable and the whole pipeline alive long enough for a value to arrive from the subscriber.

  • By cancelling cancellable, I permit the pipeline to go out of existence and prevent a retain cycle that would cause the surrounding view controller to leak.

matt
  • 515,959
  • 87
  • 875
  • 1,141
  • 3
    I think it's a fair criticism of this approach that it _forces_ the pipeline to stay alive until completion, even if the caller is deallocated. For example, if the data task publisher is created by a view controller that is dismissed, you might want the data task to be cancelled early. It might make more sense to store the sub as a property and instead write `... receiveCompletion: { [weak self] in self?.cancellable?.cancel() }`. This way the subscription is cancelled & deallocated when the view controller is deallocated, or when the task completes, whichever happens first. – Stuart Jul 01 '20 at 22:15
  • I think we can get a more elegant solution by writing a custom sink to wrap this operation – Hai Feng Kao Sep 23 '20 at 15:01
  • While capturing the reference to the cancellable inside the completion closure is effective, I think the act of calling cancel here doesn't actually do anything as the subscription is already complete. Testing with with `print(cancellable)` also keeps the subscription alive and does not create a retain cycle. Using the `withExtendedLifetime` function may be more clear to keep it alive but largely unimportant as the end result will be the same. – Helam Dec 07 '22 at 21:19
  • @Helam You could be right! I would be the first to admit that my grasp of what `withExtendedLifetime` really does is shaky at best – matt Dec 08 '22 at 03:17
9

Below is some code that does this. It's kind of awkward, and it may not even be correct. I can imagine a race condition where network request could start and complete on another thread before sub is set to the non-nil value.

Danger! Swift.Set is not thread safe. If you want to access a Set from two different threads, it is up to you to serialize the accesses so they don't overlap.

What is possible in general (although not perhaps with URLSession.DataTaskPublisher) is that a publisher emits its signals synchronously, before the sink operator even returns. This is how Just, Result.Publisher, Publishers.Sequence, and others behave. So those produce the problem you're describing, without involving thread safety.

Now, how to solve the problem? If you don't think you want to actually be able to cancel the subscription, then you can avoid creating an AnyCancellable at all by using Subscribers.Sink instead of the sink operator:

        URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .subscribe(Subscribers.Sink(
                receiveCompletion: { completion in ... },
                receiveValue: { response in ... }
            ))

Combine will clean up the subscription and the subscriber after the subscription completes (with either .finished or .failure).

But what if you do want to be able to cancel the subscription? Maybe sometimes your SomeThing gets destroyed before the subscription is complete, and you don't need the subscription to complete in that case. Then you do want to create an AnyCancellable and store it in an instance property, so that it gets cancelled when SomeThing is destroyed.

In that case, set a flag indicating that the sink won the race, and check the flag before storing the AnyCancellable.

        var sub: AnyCancellable? = nil
        var isComplete = false
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            // This ensures thread safety, if the subscription is also created
            // on DispatchQueue.main.
            .receive(on: DispatchQueue.main)
            .sink(
                receiveCompletion: { [weak self] completion in
                    isComplete = true
                    if let theSub = sub {
                        self?.subs.remove(theSub)
                    }
                }, 
                receiveValue: { response in ... }
            }
        if !isComplete {
            subs.insert(sub!)
        }
rob mayoff
  • 375,296
  • 67
  • 796
  • 848
2

combine publishers have an instance method called prefix which does this:

func prefix(_ maxLength: Int) -> Publishers.Output<Self>

https://developer.apple.com/documentation/combine/publisher/prefix(_:)

playground example

Leko
  • 21
  • 1