Here in my project I'm using scatter gather pattern where 3 parallel calls are happening. I want to achieve conditional routing i.e., from my gateway the request will come and I need to see that request and by determining the request I need to make sure the flow doesn't go to a particular recipient flow. Actually I've configured my flow to be generic so that gateway accepts a request and trigger the same flow for different requests because for all the requests flow is similar. Now for a request if a particular information is not present inside that request json then I don't want to call a particular recipient flow. Below is the code -
//SpringIntegrationConfiguration
@Bean
public IntegrationFlow flow() {
return flow ->
flow.handle(validatorService, "validateRequest")
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer ->
scatterer
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2())
.recipientFlow(flow3()),
gatherer ->
gatherer
.releaseLockBeforeSend(true)
.releaseStrategy(group -> group.size() == 2))
.aggregate(lionService.someMethod())
.to(someMethod2());
}
// flow1
@Bean
public IntegrationFlow flow1() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
(payload, header) -> {
try {
return lionService.saveRequest(
payload,
(String) header.get("dbID"),
((SourceSystem) Objects.requireNonNull(header.get("sourceSystem")))
.getSourceSystemCode());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.nullChannel();
}
// flow2
@Bean
public IntegrationFlow flow2() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(cdService, "callToaNativeMethod");
}
// flow3
@Bean
public IntegrationFlow flow3() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(lionService, "prepareRequest")
.handle(
Http.outboundGateway(ServiceURL, restTemplateConfig.restTemplate())
.mappedRequestHeaders("Content-Type")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class),
c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice =
new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString("'Failed'");
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
// flow for someMethod2
@Bean
public IntegrationFlow someMethod2() {
return flow ->
flow.handle(
Http.outboundGateway(someServiceUrl)
.httpMethod(HttpMethod.POST)
.expectedResponseType(CrResponse.class));
}
}
//Gateway
@MessagingGateway
public interface GenericGateway {
@Gateway(requestChannel = "flow.input")
void processRequest(
@Payload Message lionRequest,
@Header("dbID") String dbID,
@Header("sourceSystem") SourceSystem sourceSystem);
}
The Payload Message lionRequest is going through gateway and invoking the main flow.
Let's imagine LionRequest looks like -
{
"SCode" : "039",
"CId":"123456",
"RequestNumber": "56543457",
"dbID":"987654345678",
"someRequestBlock":{
"message":"Dummy input for dummy service"
}
}
Now
if
"someRequestBlock"
is not present inside request body then I wantflow2()
to be skipped andflow1()
andflow3()
to be run parallelly.Same for the CatRequest, the request body will be different and I need to make sure that for CatRequest
flow1()
to be skipped andflow2()
andflow3()
to be ran parallelly.
Kindly suggest how do I achieve that?