33

How do I get the asynchronous pipelines that constitute the Combine framework to line up synchronously (serially)?

Suppose I have 50 URLs from which I want to download the corresponding resources, and let's say I want to do it one at a time. I know how to do that with Operation / OperationQueue, e.g. using an Operation subclass that doesn't declare itself finished until the download is complete. How would I do the same thing using Combine?

At the moment all that occurs to me is to keep a global list of the remaining URLs and pop one off, set up that one pipeline for one download, do the download, and in the sink of the pipeline, repeat. That doesn't seem very Combine-like.

I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?

(I also imagined doing something with Future but I became hopelessly confused. I'm not used to this way of thinking.)

matt
  • 515,959
  • 87
  • 875
  • 1,141

9 Answers9

21

Use flatMap(maxPublishers:transform:) with .max(1), e.g.

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
        .flatMap(maxPublishers: .max(1)) { $0 }
        .eraseToAnyPublisher()
}

Where

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: $0.data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

and

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    } receiveValue: { image in
        // do whatever you want with the images as they come in
    }
}

That resulted in:

serial

But we should recognize that you take a big performance hit doing them sequentially, like that. For example, if I bump it up to 6 at a time, it’s more than twice as fast:

concurrent

Personally, I’d recommend only downloading sequentially if you absolutely must (which, when downloading a series of images/files, is almost certainly not the case). Yes, performing requests concurrently can result in them not finishing in a particular order, but we just use a structure that is order independent (e.g. a dictionary rather than a simple array), but the performance gains are so significant that it’s generally worth it.

But, if you want them downloaded sequentially, the maxPublishers parameter can achieve that.

Rob
  • 415,655
  • 72
  • 787
  • 1,044
  • Yep, that's what my answer already says: https://stackoverflow.com/a/59889993/341994 as well as the answer I awarded the bounty to https://stackoverflow.com/a/59889174/341994 – matt Apr 13 '20 at 19:28
  • And see also now my book https://www.apeth.com/UnderstandingCombine/operators/operatorsTransformersBlockers/operatorsflatmap.html#SECexertingbackpressure – matt Apr 13 '20 at 19:31
  • By the way, speaking of sequentially, I have made great use of your sequential asynchronous Operation for a different task, thanks for writing it – matt Apr 13 '20 at 19:32
  • 1
    @matt - Lol. I confess that I didn’t see that you had found the `maxPublishers` option. And I wouldn’t have droned on about “don’t do serial” if I had noticed it was you (as I know you completely understand the pros and cons of serial vs concurrent). I literally only saw “I want to download one file at a time”, I had recently stumbled across `maxPublishers` option for something else I was doing (namely, providing [modern solution to this question](https://stackoverflow.com/a/61175826/1271826)), and I thought I’d share the Combine solution I had come up with. I didn’t mean to be so derivative. – Rob Apr 13 '20 at 19:47
  • 1
    Yeah, it was the solution referred to at https://stackoverflow.com/a/48104095/1271826 that I was talking about before; I found that very helpful. – matt Apr 13 '20 at 19:53
9

I've only briefly tested this, but at first pass it appears that each request waits for the previous request to finish before starting.

I'm posting this solution in search of feedback. Please be critical if this isn't a good solution.

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


A more concise version of this solution (provided by @matt):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            $0.append($1).eraseToAnyPublisher()
        }
    }
}
Clay Ellis
  • 4,960
  • 2
  • 37
  • 45
  • Excellent, thanks. `append` is exactly what I was looking for. — Your code can be tightened up considerably; in particular, there is no need to return prematurely in the case where `count == 1`, because in that case `dropFirst` will be empty and we just won't loop. And there is no need to maintain the `output` variable, because we can use `reduce` instead of `for...in`. See my answer for a tighter rendering. – matt Jan 24 '20 at 17:05
9

You could create custom Subscriber where receive returning Subscribers.Demand.max(1). In that case the subscriber will request next value only when received one. The example is for Int.publisher, but some random delay in map mimics network traffic :-)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber {
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  }

  func receive(completion: Subscribers.Completion<Never>) {
    DispatchQueue.main.async {
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    }
  }
}

(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map {
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", $0)
    }
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

Playground print ...

Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true

UPDATE finally i found .flatMap(maxPublishers: ), which force me to update this interesting topic with little bit different approach. Please, see that I am using global queue for scheduling, not only some random delay, just to be sure that receiving serialized stream is not "random" or "lucky" behavior :-)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1)) { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        }
}
.sink { value in
    print(value, "A")
}

let B = (1 ... 9)
    .publisher
    .flatMap { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        }
}
.sink { value in
    print("     ",value, "B")
}

prints

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

Based on written here

.serialize()?

defined by Clay Ellis accepted answer could be replaced by

.publisher.flatMap(maxPublishers: .max(1)){$0}

while "unserialzed" version must use

.publisher.flatMap{$0}

"real world example"

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
    var args: [String: String]
}


let collection = urls.compactMap { value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap { data, response -> Data in
            return data
        }
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch {_ in
            Just(Postman(args: [:]))
    }
}

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}

var streamA = ""
let A = collection
    .publisher.flatMap{$0}

    .sink(receiveCompletion: { (c) in
        print(streamA, "     ", c, "    .publisher.flatMap{$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    })


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion: { (c) in
        print(streamC, "     ", c, "    .serialize()?")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    })

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1)){$0}

    .sink(receiveCompletion: { (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    })

PlaygroundPage.current.needsIndefiniteExecution = true

prints

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{$0}
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){$0}
... which proves the downloads are happening serially .-)       finished     .serialize()?

Seem to me very useful in other scenarios as well. Try to use default value of maxPublishers in next snippet and compare the results :-)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)
user3441734
  • 16,722
  • 2
  • 40
  • 59
  • @matt sink doesn't work any different, just on receive return Subsribers.Demand.unlimited ... May be using the proper instrument, like serial queue and Data.init?(contentsOf url: URL) is the best option in your scenario. If you need to make a sum of two Int, do you do it as [lhs: Int, rhs: Int].reduce .... ??? I will use Data.init?(contentsOf url: URL) inside receive( _ input:) of MySerialDownloaderSubscriber. – user3441734 Jan 24 '20 at 07:30
  • @matt please, see updated answer. Combine is exciting, but (at least for me) very hard to understand ... – user3441734 Jan 25 '20 at 09:20
  • Yes, I see! With the `maxPublishers` parameter, we get to add back-pressure. This goes with what I said in my question: "I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap. But then I'm still doing all the downloading simultaneously." Well, with the `maxPublishers` parameter, they are _not_ simultaneous. – matt Jan 25 '20 at 18:18
  • @matt yes, sink call publisher's own subcriber with Subscribers.Demand.unlimited, flatMap has the same effect like set publisher's own subscriber with differen value, in our usage case .max(1). I just add another example with different scenario, where it is so usable. – user3441734 Jan 25 '20 at 19:21
9

From the original question:

I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?


Here's a toy example to stand in for the real problem:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap() {$0}
    .sink {print($0)}.store(in:&self.storage)

This emits the integers from 1 to 10 in random order arriving at random times. The goal is to do something with collection that will cause it to emit the integers from 1 to 10 in order.


Now we're going to change just one thing: in the line

.flatMap {$0}

we add the maxPublishers parameter:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap(maxPublishers:.max(1)) {$0}
    .sink {print($0)}.store(in:&self.storage)

Presto, we now do emit the integers from 1 to 10, in order, with random intervals between them.


Let's apply this to the original problem. To demonstrate, I need a fairly slow Internet connection and a fairly large resource to download. First, I'll do it with ordinary .flatMap:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}
    .map {session.dataTaskPublisher(for: $0)
        .eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap() {$0}
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

The result is

start
start
start
done
done
done
finished

which shows that we are doing the three downloads simultaneously. Okay, now change

    .flatMap() {$0}

to

    .flatMap(maxPublishers:.max(1) {$0}

The result now is:

start
done
start
done
start
done
finished

So we are now downloading serially, which is the problem originally to be solved.


append

In keeping with the principle of TIMTOWTDI, we can instead chain the publishers with append to serialize them:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
    return $0.append($1).eraseToAnyPublisher()
}

The result is a publisher that serializes the delayed publishers in the original collection. Let's prove it by subscribing to it:

pub.sink {print($0)}.store(in:&self.storage)

Sure enough, the integers now arrive in order (with random intervals between).


We can encapsulate the creation of pub from a collection of publishers with an extension on Collection, as suggested by Clay Ellis:

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}
matt
  • 515,959
  • 87
  • 875
  • 1,141
2

In all of the other Reactive frameworks this is really easy; you just use concat to concatenate and flatten the results in one step and then you can reduce the results into a final array. Apple makes this difficult because Publisher.Concatenate has no overload that accepts an array of Publishers. There is similar weirdness with Publisher.Merge. I have a feeling this has to do with the fact that they return nested generic publishers instead of just returning a single generic type like rx Observable. I guess you can just call Concatenate in a loop and then reduce the concatenated results into a single array, but I really hope they address this issue in the next release. There is certainly the need to concat more than 2 publishers and to merge more than 4 publishers (and the overloads for these two operators aren't even consistent, which is just weird).

EDIT:

I came back to this and found that you can indeed concat an arbitrary array of publishers and they will emit in sequence. I have no idea why there isn't a function like ConcatenateMany to do this for you but it looks like as long as you are willing to use a type erased publisher its not that hard to write one yourself. This example shows that merge emits in temporal order while concat emits in the order of combination:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
  total.append(next).eraseToAnyPublisher()
}

var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue: { v in
    print("concatenated: \(v)")
  }).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue: { v in
    print("merge: \(v)")
  }).store(in: &subscriptions)
Josh Homann
  • 15,933
  • 3
  • 30
  • 33
  • Yes, you probably guessed I chose a big number like 50 intentionally. – matt Jan 15 '20 at 03:01
  • There is a MergeMany. I don't understand why there is not a ConcatenateMany. Rx swift has Observable.concat and Reactive Swift has flatMap(.concat) so this is strange; maybe I'm missing something. I'll keep looking https://developer.apple.com/documentation/combine/publishers/mergemany – Josh Homann Jan 15 '20 at 03:48
  • Would `concat` serialize (in the other reactive frameworks)? – matt Jan 15 '20 at 06:11
  • Yes. For a Sequence of Sequences you have only one way of flattening ie, put the elements of one inner sequence after another just like Sequence.flatMap in swift. When you have an asynchronous sequence you have to consider the temporal dimension when flattening. So you can either emit the elements from all inner sequences in temporal order (merge) or you can emit the elements from each inner sequence in order of the sequences (concat). See the marble diagram:https://rxmarbles.com/#concat vs https://rxmarbles.com/#merge – Josh Homann Jan 15 '20 at 06:15
  • Note that [`.append`](https://developer.apple.com/documentation/combine/publisher/3204685-append) is an operator that creates a `Publisher.Concatenate`. – rob mayoff Jan 15 '20 at 09:37
  • Thanks append does seem to give me fewer type matching errors than calling the Concatenate method directly. Its quite confusing that they designed the api with 3 overloads of append though, one for Sequence, one for Output and one for Publisher. – Josh Homann Jan 21 '20 at 04:39
  • Sorry but I realize now that Clay Ellis was first (and neater) with `append`. But thank you so much anyway! So useful. – matt Jan 24 '20 at 06:09
  • By the way, I've just discovered that the documentation on `append` is completely wrong about how it behaves. – matt Mar 07 '20 at 16:56
2

Here is one page playground code that depicts possible approach. The main idea is to transform async API calls into chain of Future publishers, thus making serial pipeline.

Input: range of int from 1 to 10 that asynchrounosly on background queue converted into strings

Demo of direct call to async API:

let group = DispatchGroup()
inputValues.map {
    group.enter()
    asyncCall(input: $0) { (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    }
}
group.wait()

Output:

>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}

Demo of combine pipeline:

Output:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

Code:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
    DispatchQueue.global(qos: .background).async {
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        }
}

// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
    Future<String, Error> { promise in
        asyncCall(input: input) { (value, error) in
            if let error = error {
                promise(.failure(error))
            } else {
                promise(.success(value))
            }
        }
    }
    .receive(on: DispatchQueue.main)
    .map {
        print(">> got \($0)") // << sideeffect of pipeline item
        return true
    }
    .eraseToAnyPublisher()
}

// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
        if let chain = chain {
            return chain.flatMap { _ in
                makeFuture(input: value)
            }.eraseToAnyPublisher()
        } else {
            return makeFuture(input: value)
        }
    }

// Execute pipeline
pipeline?
    .sink(receiveCompletion: { _ in
        // << do something on completion if needed
    }) { output in
        print(">>>> finished with \(output)")
    }
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true
Asperi
  • 228,894
  • 20
  • 464
  • 690
1

What about the dynamic array of URLs, something like data bus ?

      var array: [AnyPublisher<Data, URLError>] = []

      array.append(Task())

      array.publisher
         .flatMap { $0 }
         .sink {

         }
         // it will be finished
      array.append(Task())
      array.append(Task())
      array.append(Task())
1

Another approach, if you want to collect all the results of the downloads, in order to know which one failed and which one not, is to write a custom publisher that looks like this:

extension Publishers {
    struct Serialize<Upstream: Publisher>: Publisher {
        typealias Output = [Result<Upstream.Output, Upstream.Failure>]
        typealias Failure = Never

        let upstreams: [Upstream]

        init<C: Collection>(_ upstreams: C) where C.Element == Upstream {
            self.upstreams = Array(upstreams)
        }

        init(_ upstreams: Upstream...) {
            self.upstreams = upstreams
        }

        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            guard let first = upstreams.first else { return Empty().subscribe(subscriber) }
            first
                .map { Result<Upstream.Output, Upstream.Failure>.success($0) }
                .catch { Just(Result<Upstream.Output, Upstream.Failure>.failure($0)) }
                .map { [$0] }
                .append(Serialize(upstreams.dropFirst()))
                .collect()
                .map { $0.flatMap { $0 } }
                .subscribe(subscriber)
        }
    }
}

extension Collection where Element: Publisher {  
    func serializedPublishers() -> Publishers.Serialize<Element> {
        .init(self)
    }
}

The publisher takes the first download task, converts its output/failure to a Result instance, and prepends it to the "recursive" call for the rest of the list.

Usage: Publishers.Serialize(listOfDownloadTasks), or listOfDownloadTasks.serializedPublishers().

One minor inconvenient of this implementation is the fact that the Result instance needs to be wrapped into an array, just to be flattened three steps later in the pipeline. Perhaps someone can suggest a better alternative to that.

Cristik
  • 30,989
  • 25
  • 91
  • 127
0

While @matt's answer (before the append) works, I don't think his explanation is quite accurate. (And please correct me if I'm wrong @matt)

In his example, the network requests are all firing off one after another, without waiting for the previous request to finish before the next request launches.

If we were to add a print before the call to session.dataTaskPublisher like this:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}
    .map {
        print("Start request for \($0.absoluteString)")
        return session.dataTaskPublisher(for: $0)
                .eraseToAnyPublisher()
    }
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap(maxPublishers: .max(1)) {$0}
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

The console would look something like this:

Start request for {url}
Start request for {url}
Start request for {url}
start
done
start
done
start
done
finished

So really it's just the responses that are returned serially while the network requests get fired off one after the other.

Moving the call to session.dataTaskPublisher within flatMap(maxPublishers: .max(1)) will cause the network requests to fire only after the previous requests finishes.

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}

collection.publisher.setFailureType(to: URLError.self)
    .flatMap(maxPublishers: .max(1)) { 
        print("Start request for \($0.absoluteString)")
        return session.dataTaskPublisher(for: $0)
                .eraseToAnyPublisher()
    }
    .handleEvents(receiveOutput: {_ in print("start")})
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

Which will print something like this to the console:

Start request for {url}
start
done
Start request for {url}
start
done
Start request for {url}
start
done
finished
tww0003
  • 801
  • 9
  • 18