16

I have come across a behaviour I don't understand when using Sinks.Many<String> to notify some events to multiple subscribers:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}

This code shows the first subscriber getting the values 1 and 2, and the second subscriber getting 2. So far so good:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2

Now, assume the first subscriber disposes (cancels) its subscription after the first emission, I was expecting the first subscriber to get 1 and the second to get 2:


    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    d.dispose()

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)

}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()

However, when the second subscriber tries to subscribe the flux is considered completed. Why is this happening? I need the Sinks.Many to be available at any moment to be subscribed and unsubscribed without cancelling.

codependent
  • 23,193
  • 31
  • 166
  • 308

1 Answers1

17

I just hit the same issue.

It is caused by autoCancel defaulting to true. Unfortunately the onBackpressureBuffer javadoc makes no mention of it.

This behaviour is inherited from EmitterProcessor.create where it is documented.

To set the autoCancel flag to false it's necessary to use the alternative onBackpressureBuffer

Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
Neil Swingler
  • 449
  • 3
  • 7
  • I was just doing basic research on Sinks after noticing some odd behavior in my POC. This just saved me hours of troubleshooting. The Sinks interface for `MulticastSpec` fails to link details but does have a hint for `autoCancel` - `should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels` – Encryption Mar 09 '23 at 14:49