I have come across a behaviour I don't understand when using Sinks.Many<String>
to notify some events to multiple subscribers:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
This code shows the first subscriber getting the values 1 and 2, and the second subscriber getting 2. So far so good:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
Now, assume the first subscriber disposes (cancels) its subscription after the first emission, I was expecting the first subscriber to get 1 and the second to get 2:
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
d.dispose()
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
However, when the second subscriber tries to subscribe the flux is considered completed. Why is this happening? I need the Sinks.Many to be available at any moment to be subscribed and unsubscribed without cancelling.