0

I have a function that create collection of Publishers:

func publishers(from text: String) -> [AnyPublisher<SignalToken, Never>] {
     let signalTokens: [SignalToken] = translate(from: text)
     var delay: Int = 0
     let signalPublishers: [AnyPublisher<SignalToken, Never>] = signalTokens.map { token in
         let publisher = Just(token)
                           .delay(for: .milliseconds(delay), scheduler: DispatchQueue.main)
                           .eraseToAnyPublisher()
         delay += token.delay
         return publisher
     }
     return signalPublishers
}

In service class I have to method, one for play():

func play(signal: String) {
     anyCancellable = signalTokenSubject.sink(receiveValue: { token in print(token) }

     anyCancellable2 = publishers(from: signal)
         .publisher
         .flatMap { $0 }
         .subscribe(on: DispatchQueue.global())
         .sink(receiveValue: { [weak self] token in
            self?.signalTokenSubject.send(token)
         })
}

and one for stop():

func stop() {
     anyCancellable?.cancel()
     anyCancellable2?.cancel()
}

I've had problem with memory. When collection of publishers is large and I stop() before whole Publishers.Sequence is .finshed memory increase and never release.

Is there a way to completed Publishers.Sequence earlier, before Combine iterate over whole collection?

LakaLe_
  • 454
  • 1
  • 6
  • 15

1 Answers1

1

To reclaim the memory, release the pipelines:

func stop() {
     anyCancellable?.cancel()
     anyCancellable2?.cancel()
     anyCancellable = nil 
     anyCancellable2 = nil 
}

Actually you don't need the cancel calls, because releasing the pipelines does cancel in good order; that is the whole point of AnyCancellable. So you can just say:

func stop() {
     anyCancellable = nil 
     anyCancellable2 = nil 
}

Another thing to note is that you are running all your publishers at once. The sequence does not arrive sequentially; the whole sequence is dumped into the flapMap which starts all the publishers publishing simultaneously. Thus cancelling doesn't do you all that much good. You might want to set the maxPublishers: on your flatMap so that backpressure prevents more than some small number of publishers from arriving simultaneously (like for example one at a time).

matt
  • 515,959
  • 87
  • 875
  • 1,141
  • See also the whole discussion here about serializing a sequence of publishers: https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations – matt Aug 16 '20 at 13:47
  • Also, from what I remember, `Delay` publisher doesn't cancel (still sends a value after receiving cancel) – New Dev Aug 16 '20 at 14:17
  • @NewDev Correct of course, good point. Once the publisher has started to publish, values may come in. But you don't have to be there to receive them. :) – matt Aug 16 '20 at 14:36
  • But doesn't that mean that something (some closure?) is possibly self-retaining, at least until it completes, which would account for memory increase. And I think if you had `let c = pub.sink{print($0)}`, it would still print even after `c.cancel()` (I remember this being a surprise to me) – New Dev Aug 16 '20 at 14:40