11

I am discovering Combine. I wrote methods that make HTTP requests in a "combine" way, for example:

func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
    var request = URLRequest(url: url,
                             cachePolicy: .useProtocolCachePolicy,
                             timeoutInterval: 15)
    request.httpMethod = "GET"

    return urlSession.dataTaskPublisher(for: request)
        .tryMap {
            return $0.data
        }
        .eraseToAnyPublisher()
}

I would like to call the method multiple times and do a task after all, for example:

let myURLs: [URL] = ...

for url in myURLs {
    let cancellable = testRawDataTaskPublisher(for: url)
        .sink(receiveCompletion: { _ in }) { data in
            // save the data...
        }
}

The code above won't work because I have to store the cancellable in a variable that belongs to the class. The first question is: is it a good idea to store many (for example 1000) cancellables in something like Set<AnyCancellable>??? Won't it cause memory leaks?

var cancellables = Set<AnyCancellable>()

...

    let cancellable = ...

    cancellables.insert(cancellable) // ???

And the second question is: how to start a task when all the cancellables are finished? I was thinking about something like that

class Test {
    var cancellables = Set<AnyCancellable>()

    func run() {
        // show a loader

        let cancellable = runDownloads()
            .receive(on: RunLoop.main)
            .sink(receiveCompletion: { _ in }) { _ in
                // hide the loader
            }

        cancellables.insert(cancellable)
    }

    func runDownloads() -> AnyPublisher<Bool, Error> {
        let myURLs: [URL] = ...

        return Future<Bool, Error> { promise in
            let numberOfURLs = myURLS.count
            var numberOfFinishedTasks = 0

            for url in myURLs {
                let cancellable = testRawDataTaskPublisher(for: url)
                    .sink(receiveCompletion: { _ in }) { data in
                        // save the data...
                        numberOfFinishedTasks += 1

                        if numberOfFinishedTasks >= numberOfURLs {
                            promise(.success(true))
                        }
                    }

                cancellables.insert(cancellable)
            }
        }.eraseToAnyPublisher()
    }

    func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
        ...
    }
}

Normally I would use DispatchGroup, start multiple HTTP tasks and consume the notification when the tasks are finished, but I am wondering how to write that in a modern way using Combine.

Gil Birman
  • 35,242
  • 14
  • 75
  • 119
kampro
  • 705
  • 1
  • 7
  • 20

1 Answers1

20

You can run some operations in parallel by creating a collection of publishers, applying the flatMap operator and then collect to wait for all of the publishers to complete before continuing. Here's an example that you can run in a playground:

import Combine
import Foundation

func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
  let p = PassthroughSubject<Value, Never>()
  DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    p.send(value)
    p.send(completion: .finished)
  }
  return p.eraseToAnyPublisher()
}

let myPublishers = [1,2,3]
  .map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }

let cancel = myPublishers
  .publisher
  .flatMap { $0 }
  .collect()
  .sink { result in
    print("result:", result)
  }

Here is the output:

1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]

Notice that the publishers are all immediately started (in their original order).

The 1 / $0 delay causes the first publisher to take the longest to complete. Notice the order of the values at the end. Since the first took the longest to complete, it is the last item.

Gil Birman
  • 35,242
  • 14
  • 75
  • 119
  • Great solution imo. But I just wonder if it is intentional that you deliberately changed the type of the value of the publishers from `Int` to `Double`? – nayem Dec 11 '19 at 05:18
  • Swift get's weird here. Because the `$0` is used to calculate a `Double` value (the delay), the numbers in the original array are inferred to be `Double`s. I should probably have avoided generics to keep it simpler. – Gil Birman Dec 11 '19 at 06:08
  • Well, not really that I think _Swift gets weird here_. Not being too nosy about the fact, but I think I can bring light to that reason if you allow me. The reason here is the **type inference** of `$` notation. You defined the function as generic and you aren't helping the **type inference** to infer the types of both arguments while calling. Both of the parameters are passed with the `$` notation and the `delay` parameter forces the type of the `$` to be `Double`. Hence the generic `Value` is inferred to be `Double` as well. You can get rid of that by `Double(1 / $0)` or `1 / Double($0)`. – nayem Dec 11 '19 at 06:25
  • I agree with both our explanations Thanks for the suggested fix. Updated the answer to incorporate it. – Gil Birman Dec 11 '19 at 07:52
  • Everything is clear. Thank you, the problem is solved. – kampro Dec 11 '19 at 08:50
  • 1
    Note that `.publisher` can use a `Never` error and mess up your types. Alternatively you can use `Publishers.Sequence(sequence: array.map(...))` to keep the error types intact. – alejandromp May 08 '20 at 16:54
  • 1
    What if I want to do the same thing on publishers that are of different return type? – Cyber Gh Jul 26 '20 at 18:59