3

I'm observing an unexpected behavior regarding CombineLatest, if the inner publishers has subscribe(on:), the CombineLatest stream is not emitting any value.

Notes:

  • With Zip operator is working
  • Moving the subscribe(on:) / receive(on:) to the combineLatest stream also work. But in this particular use case, the inner publishers is defining their subscribe/receive because are (re)used in other places.
  • Adding subscribe(on:)/receive(on:) to only one of the inner publishers also work, so the problem is just when both have it.
    func makePublisher() -> AnyPublisher<Int, Never> {
        Deferred {
            Future { promise in
                DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
                    promise(.success(Int.random(in: 0...3)))
                }
            }
        }
        .subscribe(on: DispatchQueue.global())
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
    }
    
    var cancellables = Set<AnyCancellable>()
    Publishers.CombineLatest(
        makePublisher(),
        makePublisher()
    )
    .sink { completion in
        print(completion)
    } receiveValue: { (a, b) in
        print(a, b)
    }.store(in: &cancellables)

Is this a combine bug or expected behavior? Do you have any idea of how can be setup this kind of stream where the inners can define its own subscribe scheduler?

1 Answers1

5

Yes, it's a bug. We can simplify the test case to this:

import Combine
import Dispatch

let pub = Just("x")
    .subscribe(on: DispatchQueue.main)

let ticket = pub.combineLatest(pub)
    .sink(
        receiveCompletion: { print($0) },
        receiveValue: { print($0) })

This never prints anything. But if you comment out the subscribe(on:) operator, it prints what's expected. If you leave subscribe(on:) in, but insert some print() operators, you'll see that the CombineLatest operator never sends any demand upstream.

I suggest you copy the CombineX reimplementation of CombineLatest and the utilities it needs to compile (the CombineX implementations of Lock and LockedAtomic, I think). I don't know that the CombineX version works either, but if it's buggy, at least you have the source and can try to fix it.

rob mayoff
  • 375,296
  • 67
  • 796
  • 848