11

I'm using Swift Combine for my API requests. Now I'm facing a situation where I want to have more than 4 parallel requests that I want to zip together. Before I had exactly 4 requests that I zipped together using Zip4() operator. I can imagine that you do the zipping in multiple steps but I don't know how to write the receiveValue for it.

Here's a simplification of my current code with 4 parallel requests:

    Publishers.Zip4(request1, request2, request3, request4)
        .sink(receiveCompletion: { completion in
            // completion code if all 4 requests completed
        }, receiveValue: { request1Response, request2Response, request3Response, request4Response in
            // do something with request1Response
            // do something with request2Response
            // do something with request3Response
            // do something with request4Response
        }
    )
        .store(in: &state.subscriptions)
G. Marc
  • 4,987
  • 4
  • 32
  • 49

9 Answers9

21

The thing that stops you from zipping an arbitrary number of publishers is the very unfortunate fact that Apple has elected to make the output of the zip operators be a tuple. Tuples are extremely inflexible and limited in their power. You can’t have a tuple of, say, ten elements; and you can’t even append an element to a tuple, because that causes you to get a different type. What we need, therefore, is a new operator that does the same job as zip but emits some more powerful and flexible result, such as an array.

And we can make one! Luckily, the zip operator itself has a transform parameter that lets us specify what sort of output we want.

Okay, so, to illustrate, I'll zip ten publishers together. First, I'll make an array of ten publishers; they will be mere Just publishers, but that's sufficient to make the point, and to prove that I'm not cheating I'll attach an arbitrary delay to each of them:

let justs = (1...10).map {
    Just($0)
        .delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
        .eraseToAnyPublisher() }

Okay, now I've got an array of publishers, and I'll zip them together in a loop:

let result = justs.dropFirst().reduce(into: AnyPublisher(justs[0].map{[$0]})) { 
    res, just in
    res = res.zip(just) {
        i1, i2 -> [Int] in
        return i1 + [i2]
    }.eraseToAnyPublisher()
}

Note the trailing closure after the zip operator! This ensures that my output will be an Array<Int> instead of a tuple. Unlike a tuple, I'm allowed to make an array of any size, just adding elements each time thru the loop.

Okay, so result is now a Zip publisher that zips together ten publishers. To prove it, I'll just attach a subscriber to it and print the output:

result.sink {print($0)}.store(in: &self.storage)

We run the code. There is a heart-stopping pause — rightly, because each of those Just publishers has a different random delay, and the rule of zip is that they all need to publish before we get any output. They all do, sooner or later, and the output appears in the console:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Exactly the right answer! I've proved that I did in fact zip ten publishers together to produce output consisting of a single contribution from each of them.

Zipping together an arbitrary number of data task publishers (or whatever you're using) is no different.

(For a related question, where I learn how to serialize an arbitrary number of data task publishers, see Combine framework serialize async operations.)

matt
  • 515,959
  • 87
  • 875
  • 1,141
  • Thanks for this very detailed explanation. This seems to be an excellent solution, especially if you don't know the number of publishers to zip at compile time. For my special case I only have 6 publishers to zip, so I went for Predrag Samardzic's solution. – G. Marc Feb 22 '20 at 04:50
  • Sure, I just wanted to show how it could be done in general. – matt Feb 22 '20 at 04:53
  • 5
    it is very nice, the only trouble is that all publishers have to be the same type, which in case of zip operator could be the real limitation. – user3441734 Feb 25 '20 at 00:29
  • Publishers.Zip3(Just(0), Just("alfa"), Just(true)) is a valid statement. – user3441734 Feb 25 '20 at 00:54
  • @user3441734 Hmm, I see. How can I emulate that? – matt Feb 25 '20 at 01:16
  • if i know, i will post my answer :-). Maybe, with help of Mirror some generic "serialized" publisher0.zip(publisher2).zip(publisher3 ...sink could be split – user3441734 Feb 25 '20 at 01:20
  • This is an amazing solution . For anyone looking for copy-pasta, see below: https://stackoverflow.com/a/62761735/1672161 – abc123 Jul 06 '20 at 17:56
  • The only strange part to me is `justs.dropFirst()`. What if the size of `justs` is less or equal 1 ? – orkenstein Sep 27 '20 at 17:23
  • @matt could you, please, point out, how should this snippet be modified to work for any size of `justs`? – orkenstein Sep 27 '20 at 17:53
11

Based on Matt's answer:

extension Publishers {
    struct ZipMany<Element, F: Error>: Publisher {
        typealias Output = [Element]
        typealias Failure = F

        private let upstreams: [AnyPublisher<Element, F>]

        init(_ upstreams: [AnyPublisher<Element, F>]) {
            self.upstreams = upstreams
        }

        func receive<S: Subscriber>(subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input {
            let initial = Just<[Element]>([])
                .setFailureType(to: F.self)
                .eraseToAnyPublisher()

            let zipped = upstreams.reduce(into: initial) { result, upstream in
                result = result.zip(upstream) { elements, element in
                    elements + [element]
                }
                .eraseToAnyPublisher()
            }

            zipped.subscribe(subscriber)
        }
    }
}

A unit test can use the following as input:

let upstreams: [AnyPublisher<String, Never>] = [
    Just("first")
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher(),
    Just("second").eraseToAnyPublisher()
]

The .receive(on:) puts that event's emission on the end of the main queue so that it will emit after "second".

abc123
  • 8,043
  • 7
  • 49
  • 80
9

Think you can accomplish that like this:

let zipped1 = Publishers.Zip4(request1, request2, request3, request4)    
let zipped2 = Publishers.Zip4(request5, request6, request7, request8)

Publishers.Zip(zipped1, zipped2)
    .sink(receiveCompletion: { completion in
        // completion code if all 8 requests completed
    }, receiveValue: { response1, response2 in
        // do something with response1.0
        // do something with response1.1
        // do something with response1.2, response1.3, response2.0, response2.1, response2.2, response2.3
    }
)
    .store(in: &state.subscriptions)
Predrag Samardzic
  • 2,661
  • 15
  • 18
3

If you want to keep using the Zip semantics, you can write a custom publisher for this. Basically, the new Zip5 will be a Zip between a Zip4 and the 5th publisher.

extension Publishers {
    struct Zip5<A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher>: Publisher
    where A.Failure == B.Failure, A.Failure == C.Failure, A.Failure == D.Failure, A.Failure == E.Failure {
        typealias Output = (A.Output, B.Output, C.Output, D.Output, E.Output)
        typealias Failure = A.Failure
        
        private let a: A
        private let b: B
        private let c: C
        private let d: D
        private let e: E
        
        init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E) {
            self.a = a
            self.b = b
            self.c = c
            self.d = d
            self.e = e
        }
        
        func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure {
            Zip(Zip4(a, b, c, d), e)
                .map { ($0.0, $0.1, $0.2, $0.3, $1) }
                .receive(subscriber: subscriber)
        }
    }
}

extension Publisher {
    func zip<O1: Publisher, O2: Publisher, O3: Publisher, O4: Publisher>(_ o1: O1, _ o2: O2, _ o3: O3, _ o4: O4) -> Publishers.Zip5<Self, O1, O2, O3, O4> {
        .init(self, o1, o2, o3, o4)
    }
}

In a similar fashion, Zip6, Zip7 can be written:

  • Zip6 => Zip(Zip4(a, b, c, d, e), Zip(e, f))
  • Zip7 => Zip(Zip4(a, b, c, d, e), Zip3(e, f, g))

, and so on.

The downside is that this requires a lot of code to write, and if you end up needing this kind of zip operations, maybe it would be a good opportunity to re-visit the design of your app, maybe you don't need so much zipping after all.

To sustain the above, look on how would the zip6() declaration look like:

func zip<O1: Publisher, O2: Publisher, O3: Publisher, O4: Publisher, O5: Publisher>(_ o1: O1, _ o2: O2, _ o3: O3, _ o4: O4, _ o5: O5) -> Publishers.Zip6<Self, O1, O2, O3, O4, O5> {
    .init(self, o1, o2, o3, o4, o5)
}

Having so many generic arguments, and constraints on all generic arguments, makes the it harder to use and understand.

Cristik
  • 30,989
  • 25
  • 91
  • 127
2

Works for me by using transform

let pub1: Just<Int> = Just(1)
let pub2: Just<String> = Just("string")
let pub3: Just<Double> = Just(1)
let pub4: Just<Float> = Just(1)

let pub = pub1
    .zip(pub2)
    .zip(pub3, { return ($0.0, $0.1, $1) })
    .zip(pub4, { return ($0.0, $0.1, $0.2, $1) })

var cancel: Set<AnyCancellable> = .init()

pub.sink {
    print($0.0) // is Int
    print($0.1) // is String
    print($0.2) // is Double
    print($0.3) // is Float
}.store(in: &cancel)

Or example using Publishers.Zip4

let pub1: Just<Int> = Just(1)
let pub2: Just<String> = Just("string")
let pub3: Just<Double> = Just(1)
let pub4: Just<Float> = Just(1)

let pub5: Just<Int> = Just(2)
let pub6: Just<String> = Just("string2")
let pub7: Just<Double> = Just(2)
let pub8: Just<Float> = Just(2)

let zip1 = Publishers.Zip4(pub1, pub2, pub3, pub4)
let zip2 = Publishers.Zip4(pub5, pub6, pub7, pub8)

let pub = zip1.zip(zip2, { return ($0.0 ,$0.1, $0.2, $0.3, $1.0, $1.1, $1.2, $1.3) })

var cancel: Set<AnyCancellable> = .init()

pub.sink {
    print($0.0) // is Int
    print($0.1) // is String
    print($0.2) // is Double
    print($0.3) // is Float
    print($0.4) // is Int
    print($0.5) // is String
    print($0.6) // is Double
    print($0.7) // is Float
}.store(in: &cancel)
1

(1) Predrag's vs (2) Matt's answer

(1) i have a trouble to remember, how to use the results (the naming in closure is not in some "easy to remember" notation

(2) Matt's solution is limited to the same Output type, zip doesn't have this limitation

I suggest a different variant

let handler =
    publisher1
        .zip(publisher2)
        .zip(publisher3)
        .zip(publisher4)
        .zip(publisher5)
        .zip(publisher6)

        .sink(receiveCompletion: { (c) in
            print(c)
        }) { (value) in
            print(
                value.1,            // 1
                value.0.1,          // 2
                value.0.0.1,        // 3
                value.0.0.0.1,      // 4
                value.0.0.0.0.1,    // 5
                value.0.0.0.0.0     // 6
            )
}

which is still far away to be optimal, but (at least for me) easier to use and number of zipped publishers is virtually unlimited.

from swift grammar

GRAMMAR OF A TUPLE TYPE

tuple-type → ( ) | ( tuple-type-element , tuple-type-element-list )
tuple-type-element-list → tuple-type-element | tuple-type-element , tuple-type-element-list
tuple-type-element → element-name type-annotation | type
element-name → identifier

it seems, that this could be solved by compilator, maybe we have to ask community to include some flattening of compound types to be available for our advantage.

user3441734
  • 16,722
  • 2
  • 40
  • 59
0

I thought I needed something like this, but depending on your use case you can also use collect() to wait for the completion of all upstream publishers, and afterwards do something else.

Matthew Barker
  • 638
  • 9
  • 13
0

@matt's solution is the way to go!

I'd use a simplified version of his reduction part, basically starting with an empty array of elements and then zipping each of the 10 justs in another version of the reduce function.
Thus I don't need the dropFirst() and in the reduce closure I can use the implicit return statement. Notice the zip closure is shorter, too, by using swift syntactic sugar.

Here we go with the updated version:

let justs = (1...10).map {
  Just($0)
    .delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
    .eraseToAnyPublisher() }

let start = Just([Int]()).eraseToAnyPublisher()

let result = justs.reduce(start) { publisher, element in
  publisher
    .zip(element) { $0 + [$1] }
    .eraseToAnyPublisher()
}
Fab1n
  • 2,103
  • 18
  • 32
0

There is solution inspired by the article. You could use it in the following way:

(1...10).map { Just($0) }.zip
import Foundation
import Combine

extension Collection where Element: Publisher {
    /// Combine the array of publishers to give a single array of the `Zip ` of their outputs
    public var zip: ZipCollection<Self> {
        ZipCollection(self)
    }
}

/// A `Publisher` that combines an array of publishers to provide an output of an array of the `Zip` of their respective outputs.
///
/// This behaves similarly to Combine's `Publishers.Zip` except:
/// - It takes an arbitrary number of publishers
/// - The publishers should all have the same type
///
/// The failure of any publisher causes a failure of this publisher. When all the publishers complete successfully, this publsher completes successfully
public struct ZipCollection<Publishers>: Publisher
    where
    Publishers: Collection,
    Publishers.Element: Publisher
{
    public typealias Output = [Publishers.Element.Output]
    public typealias Failure = Publishers.Element.Failure

    private let publishers: Publishers

    public init(_ publishers: Publishers) {
        self.publishers = publishers
    }

    public func receive<Subscriber>(subscriber: Subscriber)
        where
        Subscriber: Combine.Subscriber,
        Subscriber.Failure == Failure,
        Subscriber.Input == Output
    {
        let subscription = Subscription(subscriber: subscriber, publishers: publishers)
        subscriber.receive(subscription: subscription)
    }
}

extension ZipCollection {
    /// A subscription for a Zip publisher
    fileprivate final class Subscription<Subscriber>: Combine.Subscription
        where
        Subscriber: Combine.Subscriber,
        Subscriber.Failure == Failure,
        Subscriber.Input == Output
    {
        private let subscribers: [AnyCancellable]

        init(subscriber: Subscriber, publishers: Publishers) {
            var count = publishers.count
            var outputs = publishers.map { _ in Queue<Publishers.Element.Output>() }
            var completions = 0
            var hasCompleted = false
            let lock = NSLock()

            subscribers = publishers.enumerated().map { index, publisher in
                publisher.sink(receiveCompletion: { completion in
                    lock.lock()
                    defer { lock.unlock() }

                    guard case .finished = completion else {
                        // Any failure causes the entire subscription to fail.
                        subscriber.receive(completion: completion)
                        hasCompleted = true
                        outputs.forEach { queue in
                            queue.removeAll()
                        }
                        return
                    }

                    completions += 1

                    guard completions == count else { return }

                    subscriber.receive(completion: completion)
                    hasCompleted = true
                }, receiveValue: { value in
                    lock.lock()
                    defer { lock.unlock() }

                    guard !hasCompleted else { return }
                    outputs[index].enqueue(value)

                    guard (outputs.compactMap{ $0.peek() }.count) == count else { return }

                    _ = subscriber.receive(outputs.compactMap({ $0.dequeue() }))
                })
            }
        }

        public func cancel() {
            subscribers.forEach { $0.cancel() }
        }
        
        public func request(_ demand: Subscribers.Demand) {}
    }
}


/// A generic structure around a FIFO collection
fileprivate final class Queue<T> {
    typealias Element = T

    private var elements = [Element]()

    /// Add an element to the back of the queue
    func enqueue(_ element: Element) {
        elements.append(element)
    }

    /// Remove an element from the front of the queue
    func dequeue() -> Element? {
        guard !elements.isEmpty else { return nil }

        return elements.removeFirst()
    }

    /// Examine the element at the head of the queue without removing it
    func peek() -> Element? {
        elements.first
    }

    /// Remove all elements from the queue
    func removeAll() {
        elements.removeAll()
    }
}
Konstantin Nikolskii
  • 1,075
  • 1
  • 12
  • 17