0

I'm trying to build a recipe for asynchronous orchestration using spring integration gateways (both inbound and outbound). After seeing an example here, I tried using scatter-gather like this:

@Configuration
public class IntegrationComponents {

    @Value("${rest.endpoint.base}")
    private String endpointBase;

    @Bean
    public HttpRequestHandlingMessagingGateway inboundGateway() {
        return Http.inboundGateway("/test-inbound-gateway-resource")
            .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
            .requestTimeout(3000)
            .replyTimeout(3000)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway1() {
        return Http.outboundGateway(endpointBase + "/test-resource-1")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway2() {
        return Http.outboundGateway(endpointBase + "/test-resource-2")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public StandardIntegrationFlow integrationFlow() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway1())
            .get();

        IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway2())
            .get();

        return IntegrationFlows
            .from(inboundGateway())
            .transform(String.class, String::toUpperCase)
            .channel(MessageChannels.executor(executor))
            .scatterGather(
                    scatterer -> scatterer
                        .applySequence(true)
                        .recipientFlow(flow1)
                        .recipientFlow(flow2),
                    gatherer -> gatherer
                        .outputProcessor(messageGroup -> {
                            List<Message<?>> list = new ArrayList<>(messageGroup.getMessages());

                            String payload1 = (String) list.get(0).getPayload();
                            String payload2 = (String) list.get(1).getPayload();

                            return MessageBuilder.withPayload(payload1 + "+" + payload2).build();
                        }))
            .get();
    }
}

This executes, but my payloads are swapped, because in this case outboundGateway1 takes longer to execute than outboundGateway2. Payload 2 comes first, then payload 1.

Is there a way to tell scatter-gather to define/maintain order when sending to the output processor?

On a similar note, maybe split/aggregate and/or using a router is a better pattern here? But if so, what would that look like?

I tried the following split/route/aggregate, but it failed saying "The 'currentComponent' (org.springframework.integration.router.RecipientListRouter@b016b4e) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.":

@Configuration
public class IntegrationComponents {

    @Value("${rest.endpoint.base}")
    private String endpointBase;

    @Bean
    public HttpRequestHandlingMessagingGateway inboundGateway() {
        return Http.inboundGateway("/test-inbound-gateway-resource")
            .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
            .requestTimeout(3000)
            .replyTimeout(3000)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway1() {
        return Http.outboundGateway(endpointBase + "/test-resource-1")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway2() {
        return Http.outboundGateway(endpointBase + "/test-resource-2")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public StandardIntegrationFlow integrationFlow() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway1())
            .get();

        IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway2())
            .get();

        return IntegrationFlows
            .from(inboundGateway())
            .transform(String.class, String::toUpperCase)
            .split()
            .channel(MessageChannels.executor(executor))
            .routeToRecipients(r -> r
                .recipientFlow(flow1)
                .recipientFlow(flow2))
            .aggregate()
            .get();
    }
}
sparty02
  • 566
  • 1
  • 6
  • 13

1 Answers1

0

Can you not simply Collections.sort() the list in the output processor? Each message will have a IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER header since you set applySequence.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • You use an `MessageChannels.executor(executor)` in each `.recipientFlow(flow1)`. That's how the faster call returns to the aggregator earlier. I would say that your architecture is good. Only what you are missing is a sort in the `.outputProcessor(messageGroup -> )` which is called only when all replies arrived to the aggregator to trigger a `releaseStrategy`. So, follow, please, Gary's advice and sort them whatever you need. We can't do that by default because not all use-case need to have a result sorted. – Artem Bilan Oct 01 '18 at 15:51
  • I knew there was a simple answer! This worked great. – sparty02 Oct 01 '18 at 19:21