2

we are trying to make parallel calls to different recipient using scatter-gather and it works fine. But the second recipient flow is not starting unless the first one is complete(traced in Zipkin). is there is a way to make all recipients async.. very similar to split-aggregate with executor channel.

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

flow2(),flow3(),flow4() methods are methods with InterationFlow as return type.

sample code flow2() :

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }
kiran reddy
  • 135
  • 17

1 Answers1

2

This is indeed possible with the mentioned executor channel. All you recipient flows must really start from the ExecutorChannel. In your case you have to modify all of them to something like this:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

Pay attention to the IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). That's exactly how you can make each sub-flow async.

UPDATE

For the older Spring Integration version without IntegrationFlow improvement for the sub-flows we can do like this:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

This is similar to what you show in the comment above.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • while i test `IntegrationFlows.from(MessageChannels.executor(Executors.newCachedThreadPool()))` . this poses another problem where `from` has a channel name; some thing like IntegrationFlows.from("enricherChannel") – kiran reddy Jul 25 '18 at 20:42
  • my bad : IntegrationFlows.from("enricherChannel") .channel(c -> c.executor(Executors.newCachedThreadPool())) . this should work just fine. it gets the message in to the flow and aync rest of the flow. got it!!! – kiran reddy Jul 25 '18 at 20:48
  • Well, you don't need that extra channel. This one is fully enough for you: `IntegrationFlows.from(MessageChannels.executor("enricherChannel", Executors.newCachedThreadPool()))` – Artem Bilan Jul 25 '18 at 20:52
  • Getting exception on flow2() after adding `executor & from` combination Caused by: java.lang.UnsupportedOperationException: null. Traced to : `@Override public void configure(IntegrationFlowDefinition> flow) { throw new UnsupportedOperationException(); }` – kiran reddy Jul 25 '18 at 21:31
  • This is interesting , i modified `. recipientFlow` to `recipient` and named the channel , the application starts fine; But the `. recipientFlow` still throws unsupported exception – kiran reddy Jul 25 '18 at 22:04
  • Well, it looks like you don't use the latest Spring Integration. So, yes: in this case would be better to go the classical `recipient` based on the channel name from the sub-flow beans. Of you can do like I show in the UPDATE of my answer. – Artem Bilan Jul 26 '18 at 00:22
  • What does the implementation of `taskExexecutor()` have to look like for this to work ? – Chris Jul 13 '21 at 09:15
  • 1
    I found I could replace `taskExexecutor()` with an injected instance of `java.util.concurrent.ExecutorService`. – Chris Jul 13 '21 at 09:32