I'm using Spring Integration using DSL for handling communication between JMS and REST service. The requirement is that messages should be redelivered indefinetly. In one case, I have to sequentially execute two operations. If first one fails, the second one shouldn't be executed, but in case it's 4xx error I shouldn't try to redeliver it. My code looks like this:
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
.publishSubscribeChannel(c -> c
.subscribe(firstRestOperation ->
firstRestOperation
.transform(originalMessageToFirstRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
.httpMethod(HttpMethod.POST).get()) //when this handler receives HTTP Status 4xx,
//second operation shouldn't be executed and
//and message shouldn't be redelievered
.subscribe(secondRestOperation->
secondRestOperation
.transform(originalMessageToSecondRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();
class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if(response.getStatusCode().is4xxClientError()){
log.warn(...);
}else{
super.handleError(response);
}
}
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
return restTemplate;
}
How can I meet these requirements? Only idea I have is to somehow interrupt IntegrationFlow while commiting JMS session.
Thanks for any suggestions.
UPDATE
Option A: Currently:
- operation 1 fails with any error
- operation 2 is not executed
- message is redelivered indefinetly
Option B: I can also handle 4xx error, then:
- operation 1 fails with 4xx, exception is handled
- operation 2 is executed
- integration flow finishes ok, JMS Session is commited and message is not being redelivered
but this causes operation 2 to be executed
What I need is:
operation 1 fails with 4xx
operation 2 is not executed
message is not redelivered
UPDATE 2
I think I might be getting somewhere. As @gary-russel suggested I added error channel, and handled 4xx errors:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
}
@Bean
public PublishSubscribeChannel errorHandlingChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn(...) //handle error here
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
//how should I implement it?
}
I want to handle ONLY 4xx error, the rest should be propagated and should cause JMS message redelivery. I tried not setting defaultOutputChannel
in ErrorMessageExceptionTypeRouter
(than another exception is thrown) or setting defaultOutputChannel
to default errorChannel
(than all of the errors are handled).
UPDATE 3
Found solution in: Spring Integration Java DSL using JMS retry/redlivery
Here's code for my error handling flow:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
}
@Bean
public PublishSubscribeChannel customErrorChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
}
public void handleError(Throwable t) throws Throwable {
log.warn("Received unhandled exception");
throw t;
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
So the solution was to redirect exceptions to a flow that will handle them by rethrowing them. Too bad BaseIntegrationFlow
doesn't have a method that accepts and throws Throwable
- right now it's only possible by specifying bean and method name to invoke.