3

I have multiple consumers connected to a SEDA queue, and every consumer decides for itself if he handles the event or not using a choice-when.

Each consumer should react on completion and on failure of the whole process through every route that follows.

//this exists in a super class and is inherited multiple times
from("seda:queue-with-multiple-consumers?multipleConsumers=true")
    .routeId("id-of-some-consumer-1") //will be set per consumer
    .choice().when(consumerCheckIfEligible())
        .onCompletion().onCompleteOnly()
            .process(exchange -> {
                //should be triggered if direct:delegate-to-actual-processing-route succeeds
                log.info("onCompletion().onCompleteOnly()");
            })
        .end()
        
        .onCompletion().onFailureOnly()
            .process(exchange -> {
                //should be triggered if direct:delegate-to-actual-processing-route fails
                log.info("onCompletion().onFailureOnly()");
            })
        .end()

        .onException(Exception.class)
            .process(e -> {
                //should be triggered if direct:delegate-to-actual-processing-route fails
                log.info("onException(Exception.class)");
            })
        .end()

        .to("direct:delegate-to-actual-processing-route")

    .end()    

However, Apache Camel won't tolerate the onCompletion() hooks anywhere but at the top and produces this error:

 java.lang.IllegalArgumentException: The output must be added as top-level on the route. Try moving onCompletion[[]] to the top of route.
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:212)
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:58)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:308)
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164)
    at com.my.project.Application.main(Application.java:37)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: java.lang.IllegalArgumentException: The output must be added as top-level on the route. Try moving onCompletion[[]] to the top of route.
    at org.apache.camel.model.ProcessorDefinition.addOutput(ProcessorDefinition.java:206)
    at org.apache.camel.model.ChoiceDefinition.addOutput(ChoiceDefinition.java:151)
    at org.apache.camel.model.ProcessorDefinition.onCompletion(ProcessorDefinition.java:3639)
    at com.my.project.webhook.camel.CamelFileReceiver.configure(CamelFileReceiver.java:114)
    at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:672)
    at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:618)
    at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:554)
    at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1178)
    at org.apache.camel.main.RoutesConfigurer.addDiscoveredRoutes(RoutesConfigurer.java:237)
    at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:212)
    at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:106)
    ... 19 common frames omitted

How can I get around the The output must be added as top-level on the route error and use the onCompletion() / onException() hooks conditionally?

Marian Klühspies
  • 15,824
  • 16
  • 93
  • 136

1 Answers1

1

Why not just introduce an intermediary sub-route, so that the onCompletion can reside on top of this (sub) route ?

from("seda:queue-with-multiple-consumers?multipleConsumers=true")
    ...
    .choice().when(consumerCheckIfEligible())
        .to("direct:conditionalProcessing")
        .to("direct:delegate-to-actual-processing-route")
    .end();


from("direct:conditionalProcessing")
    .onCompletion().onCompleteOnly()
            .process(...)
    .end()
    .onCompletion().onFailureOnly()
            .process(...)
    .end()
    .onException(Exception.class)
            .process(...)
    .end();
TacheDeChoco
  • 3,683
  • 1
  • 14
  • 17