0

Here in my project I'm using scatter gather pattern where 3 parallel calls are happening. I want to achieve conditional routing i.e., from my gateway the request will come and I need to see that request and by determining the request I need to make sure the flow doesn't go to a particular recipient flow. Actually I've configured my flow to be generic so that gateway accepts a request and trigger the same flow for different requests because for all the requests flow is similar. Now for a request if a particular information is not present inside that request json then I don't want to call a particular recipient flow. Below is the code -

//SpringIntegrationConfiguration

 @Bean
  public IntegrationFlow flow() {
    return flow ->
        flow.handle(validatorService, "validateRequest")
            .split()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()),
                gatherer ->
                    gatherer
                        .releaseLockBeforeSend(true)
                        .releaseStrategy(group -> group.size() == 2))
            .aggregate(lionService.someMethod())
            .to(someMethod2());
  }

  //   flow1
  @Bean
  public IntegrationFlow flow1() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                (payload, header) -> {
                  try {
                    return lionService.saveRequest(
                        payload,
                        (String) header.get("dbID"),
                        ((SourceSystem) Objects.requireNonNull(header.get("sourceSystem")))
                            .getSourceSystemCode());
                  } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                  }
                })
            .nullChannel();
  }

  //  flow2
  @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(cdService, "callToaNativeMethod");
  }

  // flow3
  @Bean
  public IntegrationFlow flow3() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(lionService, "prepareRequest")
            .handle(
                Http.outboundGateway(ServiceURL, restTemplateConfig.restTemplate())
                    .mappedRequestHeaders("Content-Type")
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class),
                c -> c.advice(expressionAdvice()));
  }

  @Bean
  public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice =
        new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString("'Failed'");
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
  }

  @Bean
  public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
  }

  @Bean
  public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
  }
  // flow for someMethod2
  @Bean
  public IntegrationFlow someMethod2() {
    return flow ->
        flow.handle(
            Http.outboundGateway(someServiceUrl)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(CrResponse.class));
  }
}

//Gateway

@MessagingGateway
public interface GenericGateway {

  @Gateway(requestChannel = "flow.input")
  void processRequest(
      @Payload Message lionRequest,
      @Header("dbID") String dbID,
      @Header("sourceSystem") SourceSystem sourceSystem);
}

The Payload Message lionRequest is going through gateway and invoking the main flow.

Let's imagine LionRequest looks like -

{
     "SCode" : "039",
     "CId":"123456",
     "RequestNumber": "56543457",
     "dbID":"987654345678",
     "someRequestBlock":{
         "message":"Dummy input for dummy service"
     }

}

Now

  1. if "someRequestBlock" is not present inside request body then I want flow2() to be skipped and flow1() and flow3() to be run parallelly.

  2. Same for the CatRequest, the request body will be different and I need to make sure that for CatRequest flow1() to be skipped and flow2() and flow3() to be ran parallelly.

Kindly suggest how do I achieve that?

2 Answers2

0

You could add a .filter() at the beginning of each flow.

Or, instead of .recipientFlow, use this

/**
 * Adds a recipient channel that will be selected if the the expression evaluates to 'true'.
 * @param channelName the channel name.
 * @param expression the expression.
 * @return the router spec.
 */
public RecipientListRouterSpec recipient(String channelName, Expression expression) {
    ExpressionEvaluatingSelector selector = new ExpressionEvaluatingSelector(expression);
    this.handler.addRecipient(channelName, selector);
    this.componentsToRegister.put(selector, null);
    return _this();
}

With the expression being a selector to determine whether or not that recipient will receive the message.

You would then start each sub flow with a named channel.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • See my answer: I'm not sure why `recipientFlow(String expression, IntegrationFlow subFlow)` is missed for you as well... – Artem Bilan Aug 01 '22 at 16:36
  • I see it now; I was only looking at method signatures in the outline; I was expecting to see `(IntegrationFlow, Expression)` or `(IntegrationFlow, String)` for consistency with the channel variants (selector after recipient). – Gary Russell Aug 01 '22 at 17:53
  • Yeah... I chose them by the order from shorter to longer method argument. So, the channel name is definitely shorter than expression. Meanwhile an expression is shorter than `IntegrationFlow` :smile:. Although all of this is ambiguously... – Artem Bilan Aug 01 '22 at 17:57
0

There is this recipient variant for the scatterrer:

/**
 * Adds a subflow that will be invoked if the expression evaluates to 'true'.
 * @param expression the expression.
 * @param subFlow the subflow.
 * @return the router spec.
 */
public RecipientListRouterSpec recipientFlow(String expression, IntegrationFlow subFlow) {

Such a SpEL expression must return boolean and it is performed against the whole message. For your convenience the framework provides a #jsonPath() SpEL-function to evaluate your someRequestBlock attribute against the payload:

https://docs.spring.io/spring-integration/docs/current/reference/html/spel.html#built-in-spel-functions

.recipientFlow("#jsonPath(payload, '$..someRequestBlock.length()') > 1", flow2())

Can you explain us how do you miss that option, so we may improve something in the code or docs?

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Sorry I missed the documentation somehow. Sorry if I'm asking basic question but what is the #jsonpath here? In my scenario how do I make sure payload is taken into consideration? When I'm keeping #jsonpath it's throwing error. Which path I've to mention here kindly suggest. – Somnath Mukherjee Aug 01 '22 at 20:11
  • See the doc in my answer to determine what is `#jsonPath()` and follow the link from to the Jayway JsonPath to see what is that and how it works. You showed a JSON in your question, so I made an assumption that your payload is something what can be treated as JSON. What error do you get? – Artem Bilan Aug 01 '22 at 20:16
  • Sorry I didn't install the dependency so was getting error. – Somnath Mukherjee Aug 02 '22 at 07:13
  • Hi @Artem, if my condition is to check if a particular field is not present inside JSON then skip that flow, in that case .length()>1 will work or I can put theField != null ? – Somnath Mukherjee Aug 02 '22 at 14:28
  • Well, yeah. You are right. We also have a `JsonPropertyAccessor` for SpEL therefore it is indeed OK to do like this: `.recipientFlow("payload.someRequestBlock != null", flow2())`. But that `JsonPropertyAccessor` has to be registered: https://docs.spring.io/spring-integration/docs/current/reference/html/spel.html#spel-property-accessors. The `#jsonPath()` is present there by default only need to add respective `com.jayway.jsonpath:json-path` dependency. – Artem Bilan Aug 02 '22 at 14:44
  • Hi Artem, really sorry if it's too much to ask. As I said this integration flow is a generic one and being used by 4 requests. Now one field is present in one request body but is not present in other requests. I have to make sure if I'm writing spEl expression the way u showed then it'll fail for other requests as other requests doesn't have that field by default. And even if that field is not present in the other requests bodies I want the flow not to skip flow2() for other requests. In header I have sourceSystem to distinguish between different requests. Can u pls show me how to – Somnath Mukherjee Aug 03 '22 at 12:36
  • write spEL for that which can accommodate two conditions i.e., sourceSystem(info will come from header)==LionRequest && someField != null ,only then run flow2() otherwise skip that flow. – Somnath Mukherjee Aug 03 '22 at 12:38
  • If write that in SpEL is too hard for you, you can move such a prediction logic into some bean method and just call it from that expression: `@myRecipientSelector.test(#root)`. Where the `#root` is the whole `Message`, so you will be able in your method to get access not only to payload to determine that field , but also headers for that system type. – Artem Bilan Aug 03 '22 at 12:45
  • Hi @Artem, how do I access headers field like payload? It seems to be different than how I'm accessing payload fields? – Somnath Mukherjee Aug 11 '22 at 21:46
  • `headers.my_header`. I have explained what is the root of evaluation context and it has those getters, respectively. – Artem Bilan Aug 11 '22 at 22:39
  • Actually I'm trying to do headers.sourceSystem == 'ONE', but it's not working. All the time it's coming as false for some reason and skipping the flow. Though I checked in header it's sourceSystem=ONE. – Somnath Mukherjee Aug 12 '22 at 06:25
  • ,Actually this sourceSystem is an enum. And I sent it to gateway in headers. For all other fields in headers the spEl is working but for sourceSystem is not working. Some string also I sent to the headers there I can use '==' operator and it's working. But only for this header value it's not. Is it because the type in enum? But I think whatever I sent to the header it's delegate as String only right? Then ideally it should work. Kindly suggest. – Somnath Mukherjee Aug 12 '22 at 07:08
  • even I tried doing--> SourceSystem sourceSystemProv = SourceSystem.ONE; //declared the variable .recipientFlow("headers.sourceSystem == #sourceSystemProv", flow2()) //used the variable here That also didn't work. – Somnath Mukherjee Aug 12 '22 at 07:32
  • It is not clear what made you think that SpEL deals only with strings and it was not clear that your header is an enum. The SpEL is just a slight extension to Java and does not do any conversions to objects it is evaluated against . See here how to compare with an enum: https://stackoverflow.com/questions/21806298/compare-enums-in-spel. Although I’d suggest to use `name()` from that header enum value to compare against string. – Artem Bilan Aug 12 '22 at 14:14
  • Okay Artem. Is there a way to write condition inside any flow? We are using condition in recipientFlow() here but if u see my code I want to write condition likewise in to() as well. Is it possible? – Somnath Mukherjee Aug 12 '22 at 17:44
  • No, a condition is prerogative of specific components, like this recipient list or `filter()`. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter – Artem Bilan Aug 12 '22 at 21:27
  • So is there a different way to make to() conditional? – Somnath Mukherjee Aug 12 '22 at 21:33
  • Why don’t add that `filter()` before `to()` or as the first operator in that flow? – Artem Bilan Aug 13 '22 at 00:35
  • If I do that then - if the condition doesn't match then that flow is not being executed but for some reason the program is getting stuck there. It's not able to come out of the gateway. – Somnath Mukherjee Aug 13 '22 at 07:14
  • See `discardChannel` on the filter and `bridge()` to reply back to the gateway. – Artem Bilan Aug 13 '22 at 12:17