1

I'm looking for an operator that does the opposite of the collect operator:

  • Allows you to map from one emission value to many emission values, emitting them on the stream. For example if I have the element [1,2,3] in the stream, I want to convert it into a stream that emits 1 then 2, then 3.
  • Does not change the stream's completion state. Admittedly this uncollect operator wouldn't be exactly the opposite of collect since collect waits until the stream is finished. With uncollect it should just process results while the stream is unfinished, and not attempt to finish/unfinish the stream.

For example, I imagine this is how an uncollect operator would function:

func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
  Future { ... }.eraseToAnyPublisher()
}

let serverEventStream: AnyPublisher<[Int], Never> = ...

serverEventStream              // AnyPublisher<[Int], Never>  // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
  .uncollect { $0 }            // AnyPublisher<Int, Never>    // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
  .flatMap { fibonacci($0) }   // AnyPublisher<Int, Never>    // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
  .sink { print($0) }

I was looking for names like explode, splat, or uncollect, to no avail. The closest operator that shares a promising name is flatMap since Sequence.flatMap is the equivalent in the non-Combine world. However, Combine's flatMap is the way to chain promises together.

Is there a way to uncollect elements, or map a single emission into an arbitrary number of emissions?


I was able to get it to work with my custom JustSeveral Publisher, but this seems clunky:

class Tests: XCTestCase {
  func testUncollect() {
    func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
      switch number {
      case 1: return Just(1).eraseToAnyPublisher()
      case 10: return Just(55).eraseToAnyPublisher()
      case 12: return Just(144).eraseToAnyPublisher()
      case 24: return Just(46368).eraseToAnyPublisher()
      case 50: return Just(12586269025).eraseToAnyPublisher()
      default: fatalError("Should actually build this function.")
      }
    }

    let serverEventStream: AnyPublisher<[Int], Never> = JustSeveral([[12, 24], [1, 10, 50]]).eraseToAnyPublisher()

    serverEventStream              // AnyPublisher<[Int], Never>  // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
      .uncollect { $0 }            // AnyPublisher<Int, Never>    // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
      .flatMap { fibonacci($0) }   // AnyPublisher<Int, Never>    // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
      .sink { print($0) }
  }
}

extension Publisher {
  func uncollect<T>(_ transform: @escaping (Output) -> [T]) -> AnyPublisher<T, Failure> {
    self
      .flatMap { output -> AnyPublisher<T, Failure> in
        JustSeveral(transform(output))
          .setFailureType(to: Failure.self)
          .eraseToAnyPublisher()
      }
      .eraseToAnyPublisher()
  }
}
Senseful
  • 86,719
  • 67
  • 308
  • 465

1 Answers1

6

You can use flatMap together with a new publisher

[1,2,3,4,5,6].publisher
    .collect()
    .flatMap { $0.publisher }
    .sink { print($0) }

1
2
3
4
5
6

Joakim Danielson
  • 43,251
  • 5
  • 22
  • 52
  • This is exactly what I was looking for. Turns out I [missed the Sequence.publisher documentation](https://stackoverflow.com/questions/67523504/just-with-several-values#comment119350006_67523663). – Senseful May 13 '21 at 17:42
  • You might want `flatMap(maxPublishers: 1) { ... }` depending on how the inner publishers work. – rob mayoff May 14 '21 at 00:49