0

In our codebase, we were using quite often async Methods encapsulated in a Task inside a Combine pipeline.

After reading an article from swiftbysundell we implemented a Publisher extension with the asyncMap method in the article.

However, the way it was implemented we realized that it didn't guarantee that calls that came first in, came first out. After a bit of trying we figured that if we set maxPublisher in the flatMap to .max(1), we did seem to achieve a first-in-first-out. At least our tests seem to be running fine now.

However, we didn't see any implementation like this anywhere else or as a naive Swift feature, and ask ourselves if there are particular issues why this can't or shouldn't be implemented in a similar way.

We are wondering and asking if we did overlook something and maybe might run into some race conditions or unordered calls when running some async Backend calls that should be in order.

Here is the (not throwing) version of our extension:

import Foundation
import Combine

public extension Publisher {

    /// A publisher that transforms all elements from an upstream publisher using an async transform closure.
    /// Warning: The order of execution (FIFO) is only with `maxPublishers = .max(1)` guaranteed.
    func asyncMap<T>(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -> T) -> AnyPublisher<T, Failure> {
        flatMap(maxPublishers: maxPublishers) { value -> Future<T, Failure> in
            Future { promise in
                Task {
                    let result = await transform(value)
                    promise(.success(result))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}

And here are the tests we are running, one when the order doesn't matter, and the second test where the order is checked:

class PublisherAsyncTests: XCTestCase {

    var cancellableSubscriber = Set<AnyCancellable>()

    override func setUp() {
        super.setUp()
        cancellableSubscriber = []
    }

    func testAsyncMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap(maxPublishers: .unlimited) { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation])

        XCTAssertEqual(sequence, resultSequence.sorted())
    }

    func testAsyncFIFOMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation], timeout: 5)

        XCTAssertEqual(sequence, resultSequence)
    }
}

If you need additional information or have questions please don't hesitate to ask and I can update my question.

Marco Boerner
  • 1,243
  • 1
  • 11
  • 34
  • "and also were wondering why this isn't a native Swift feature" Well, implementing things takes time and effort - they need justification for why they did implement something, not for why they *didn't*implement something. [See also](https://meta.stackoverflow.com/a/293819/5133585) – Sweeper May 17 '23 at 11:55
  • Yes I agree, I think what I meant was more like, because it hasn't been implemented but in our opinion would have such a broad application, we wonder if there are reasons it cannot be implemented and that's why it hasn't. :) – Marco Boerner May 17 '23 at 12:05
  • 1
    Unclear what the question is. Using maxPublishers to serialize asynchrony is standard practice. See https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations Anyway that isn't merely first in first out; it is totally one at a time. – matt May 17 '23 at 12:48

1 Answers1

1

The maxPublishers parameter in flatMap:

Specifies the maximum number of concurrent publisher subscriptions, or unlimited if unspecified.

So if you use .max(1), flatMap can only requests one element from its upstream at a time, wait for the Future to complete, publish that value, and then request another element.

In other words, all the async operations you do in asyncMap will be carried out sequentially, not in parallel. This means that the order from the upstream is preserved, and there can be no race conditions.

If the async operations are carried out in parallel, then whichever finishes first is first published by flatMap, and that's why the order is not preserved if you use the .unlimited default.

For a simple demonstration, consider:

let x = [1,2,3,4].publisher
let cancellable = x.asyncMap(maxPublishers: .max(1)) { i in
    try! await Task.sleep(for: .seconds(1))
}.sink { _ in
    print("Foo")
}
  • This will print four Foos with 1 second delay between each one.

  • If it were maxPublishers: .max(2), it would be print two Foos after 1 second, and another two Foos after 1 second.

  • If it were maxPublishers: .unlimited, it would print four Foos after 1 second.

  • If it were maxPublishers: .none, it will print nothing and the publisher never complete.

If you want it to run the async operations in parallel, and still have the order be preserved, that is a lot less trivial than this. I cannot think off the top of my head how you'd implement this.

Sweeper
  • 213,210
  • 22
  • 193
  • 313
  • Okay, cool. then it should work as we hoped. Thanks for your detailed explanation. :) – Marco Boerner May 17 '23 at 13:29
  • And yes for our purpose it wouldn't make sense to have those operations run in parallel with the order of output preserved, also couldn't think of a good solution for that that isn't super hacky. – Marco Boerner May 17 '23 at 13:37