1

When running the code snippet at the bottom, a failure occurs inside the CompositeException. The underlying cause of the failure is

Caused by: java.lang.RuntimeException: Duplicate found in causal chain so cropping to prevent loop ...

It would seem that the instance of CompositeException is being reused by each invocation of flatMap which is why it is seeing the same exception twice.

The expected behaviour is that first element emitted from the flux will be transformed into an error signal, causing the remaining elements to be cancelled and a Flux with an error signal being propagated down the signal chain.

    Flux.just(1, 2, 3)
            .flatMap(i -> RxReactiveStreams.toPublisher(Observable.error(new NullPointerException())))
            .subscribe();

Can anyone explain why this behaviour is occurring and what can be done to mitigate it?

RxJava Version: 1.3.8

Full stack trace

rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError

    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:187)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
    at rx.internal.operators.OperatorSerialize$1.onError(OperatorSerialize.java:52)
    at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
    at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10423)
    at rx.Observable.subscribe(Observable.java:10390)
    at rx.internal.reactivestreams.PublisherAdapter.subscribe(PublisherAdapter.java:35)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418)
    at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
    at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8264)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8428)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8235)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8162)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8080)
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.nam2e(DownstreamExecutorIntegrationTest.java:76)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: rx.exceptions.CompositeException: 2 exceptions occurred. 
    ... 75 more
Caused by: rx.exceptions.CompositeException$CompositeExceptionCausalChain: Chain of Causes for CompositeException In Order Received =>
    at rx.exceptions.CompositeException.getCause(CompositeException.java:129)
    at java.base/java.lang.Throwable.printEnclosedStackTrace(Throwable.java:711)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:671)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:725)
    at com.intellij.junit5.JUnit5TestExecutionListener.getTrace(JUnit5TestExecutionListener.java:304)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:289)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:245)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:183)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:171)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.lambda$executionFinished$10(TestExecutionListenerRegistry.java:109)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.lambda$notifyEach$1(TestExecutionListenerRegistry.java:67)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.notifyEach(TestExecutionListenerRegistry.java:65)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.access$200(TestExecutionListenerRegistry.java:32)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.executionFinished(TestExecutionListenerRegistry.java:108)
    at org.junit.platform.launcher.core.ExecutionListenerAdapter.executionFinished(ExecutionListenerAdapter.java:56)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.reportCompletion(NodeTestTask.java:179)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:85)
    ... 32 more
Caused by: java.lang.NullPointerException
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.lambda$nam2e$1(DownstreamExecutorIntegrationTest.java:75)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378)
    at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
    at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8264)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8428)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8235)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8162)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8080)
    at uk.sky.ovp.mvpdb.service.service.strategy.common.executors.DownstreamExecutorIntegrationTest.nam2e(DownstreamExecutorIntegrationTest.java:76)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    ... 32 more
Caused by: java.lang.RuntimeException: Duplicate found in causal chain so cropping to prevent loop ...
    at rx.exceptions.CompositeException.getCause(CompositeException.java:145)
    at java.base/java.lang.Throwable.printEnclosedStackTrace(Throwable.java:711)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:671)
    at java.base/java.lang.Throwable.printStackTrace(Throwable.java:725)
    at com.intellij.junit5.JUnit5TestExecutionListener.getTrace(JUnit5TestExecutionListener.java:304)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:289)
    at com.intellij.junit5.JUnit5TestExecutionListener.testFailure(JUnit5TestExecutionListener.java:245)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:183)
    at com.intellij.junit5.JUnit5TestExecutionListener.executionFinished(JUnit5TestExecutionListener.java:171)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.lambda$executionFinished$10(TestExecutionListenerRegistry.java:109)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.lambda$notifyEach$1(TestExecutionListenerRegistry.java:67)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.notifyEach(TestExecutionListenerRegistry.java:65)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry.access$200(TestExecutionListenerRegistry.java:32)
    at org.junit.platform.launcher.core.TestExecutionListenerRegistry$CompositeTestExecutionListener.executionFinished(TestExecutionListenerRegistry.java:108)
    at org.junit.platform.launcher.core.ExecutionListenerAdapter.executionFinished(ExecutionListenerAdapter.java:56)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.reportCompletion(NodeTestTask.java:179)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:85)
    ... 32 more
Michael McFadyen
  • 2,675
  • 11
  • 25

1 Answers1

0

I think using RxReactiveStreams.toPublisher() is wrong here. If you want to experiment with error happening in the stream, you should map it to Observable.error() and see how it goes.

From this similar thread with the same exception:

What's happening is that your onError implementation in a Subscriber is throwing an unchecked exception which is against the Observable contract and this aborts the observable processing throwing an OnErrorFailedException in the observeOn scheduler.

This can be seen in your stack trace:

rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:187)

As a sidenote, RxJava 1 was end-of-lifed in 2018 and it shouldn't be used anymore anyway.

eis
  • 51,991
  • 13
  • 150
  • 199
  • First of all, thanks for the response. The code snippet above is a very simplified version of the actual code by replicates the issue. The actual code is using webflux and hystrix and making api calls. Also, hystrix is pulling in rxjava which is why it is on an old version. – Michael McFadyen Jan 12 '21 at 09:44
  • @MichaelMcFadyen could you include in your question the dependencies + versions required to reproduce the problem, in addition to rxjava? incl. webflux – eis Jan 12 '21 at 10:27