I'm looking for an operator that does the opposite of the collect operator:
- Allows you to map from one emission value to many emission values, emitting them on the stream. For example if I have the element
[1,2,3]
in the stream, I want to convert it into a stream that emits1
then2
, then3
. - Does not change the stream's completion state. Admittedly this
uncollect
operator wouldn't be exactly the opposite ofcollect
sincecollect
waits until the stream is finished. Withuncollect
it should just process results while the stream is unfinished, and not attempt to finish/unfinish the stream.
For example, I imagine this is how an uncollect
operator would function:
func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
Future { ... }.eraseToAnyPublisher()
}
let serverEventStream: AnyPublisher<[Int], Never> = ...
serverEventStream // AnyPublisher<[Int], Never> // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
.uncollect { $0 } // AnyPublisher<Int, Never> // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
.flatMap { fibonacci($0) } // AnyPublisher<Int, Never> // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
.sink { print($0) }
I was looking for names like explode
, splat
, or uncollect
, to no avail. The closest operator that shares a promising name is flatMap
since Sequence.flatMap
is the equivalent in the non-Combine world. However, Combine's flatMap
is the way to chain promises together.
Is there a way to uncollect elements, or map a single emission into an arbitrary number of emissions?
I was able to get it to work with my custom JustSeveral
Publisher, but this seems clunky:
class Tests: XCTestCase {
func testUncollect() {
func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
switch number {
case 1: return Just(1).eraseToAnyPublisher()
case 10: return Just(55).eraseToAnyPublisher()
case 12: return Just(144).eraseToAnyPublisher()
case 24: return Just(46368).eraseToAnyPublisher()
case 50: return Just(12586269025).eraseToAnyPublisher()
default: fatalError("Should actually build this function.")
}
}
let serverEventStream: AnyPublisher<[Int], Never> = JustSeveral([[12, 24], [1, 10, 50]]).eraseToAnyPublisher()
serverEventStream // AnyPublisher<[Int], Never> // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
.uncollect { $0 } // AnyPublisher<Int, Never> // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
.flatMap { fibonacci($0) } // AnyPublisher<Int, Never> // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
.sink { print($0) }
}
}
extension Publisher {
func uncollect<T>(_ transform: @escaping (Output) -> [T]) -> AnyPublisher<T, Failure> {
self
.flatMap { output -> AnyPublisher<T, Failure> in
JustSeveral(transform(output))
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}