In our codebase, we were using quite often async
Methods encapsulated in a Task
inside a Combine pipeline.
After reading an article from swiftbysundell we implemented a Publisher extension with the asyncMap
method in the article.
However, the way it was implemented we realized that it didn't guarantee that calls that came first in, came first out. After a bit of trying we figured that if we set maxPublisher
in the flatMap
to .max(1)
, we did seem to achieve a first-in-first-out. At least our tests seem to be running fine now.
However, we didn't see any implementation like this anywhere else or as a naive Swift feature, and ask ourselves if there are particular issues why this can't or shouldn't be implemented in a similar way.
We are wondering and asking if we did overlook something and maybe might run into some race conditions or unordered calls when running some async Backend calls that should be in order.
Here is the (not throwing) version of our extension:
import Foundation
import Combine
public extension Publisher {
/// A publisher that transforms all elements from an upstream publisher using an async transform closure.
/// Warning: The order of execution (FIFO) is only with `maxPublishers = .max(1)` guaranteed.
func asyncMap<T>(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -> T) -> AnyPublisher<T, Failure> {
flatMap(maxPublishers: maxPublishers) { value -> Future<T, Failure> in
Future { promise in
Task {
let result = await transform(value)
promise(.success(result))
}
}
}
.eraseToAnyPublisher()
}
}
And here are the tests we are running, one when the order doesn't matter, and the second test where the order is checked:
class PublisherAsyncTests: XCTestCase {
var cancellableSubscriber = Set<AnyCancellable>()
override func setUp() {
super.setUp()
cancellableSubscriber = []
}
func testAsyncMap() async throws {
let expectation = expectation(description: "testAsyncMap")
let sequence = [1, 2, 3, 4, 5, 6]
var resultSequence: [Int] = []
sequence
.publisher
.asyncMap(maxPublishers: .unlimited) { value in
try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
return value
}
.collect()
.sink { value in
resultSequence = value
expectation.fulfill()
}
.store(in: &cancellableSubscriber)
await fulfillment(of: [expectation])
XCTAssertEqual(sequence, resultSequence.sorted())
}
func testAsyncFIFOMap() async throws {
let expectation = expectation(description: "testAsyncMap")
let sequence = [1, 2, 3, 4, 5, 6]
var resultSequence: [Int] = []
sequence
.publisher
.asyncMap { value in
try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
return value
}
.collect()
.sink { value in
resultSequence = value
expectation.fulfill()
}
.store(in: &cancellableSubscriber)
await fulfillment(of: [expectation], timeout: 5)
XCTAssertEqual(sequence, resultSequence)
}
}
If you need additional information or have questions please don't hesitate to ask and I can update my question.