1

I'm trying to have a webflux boot app make calls to a separate SCS boot app. The most applicable previous question I found was this one. Is there a way to create a Request/Response MessagingGateway that doesn't use the (@Input,@Output,@EnableBinding) deprecated annotations?

I want to use the newer functional style in both apps if possible. So far this maven project is my best/only working result. I've tried all kinds of techniques including making the gateway return WebFlux message types directly. If needed, I can dig those up but I figured it might be easier to push what I have that works rather than clutter this question with my graveyard.

Thanks, Glenn

1 Answers1

0

Here is a sample how to configure a request-reply gateway with Spring Cloud Stream:

@SpringBootApplication
public class SpringCloudStreamRequestReplyApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamRequestReplyApplication.class, args);
    }

    @Bean
    IntegrationFlow requestFlow(StreamBridge streamBridge) {
        return IntegrationFlows.from(UpperCaseGateway.class)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .handle(m -> streamBridge.send("requests", m))
                .get();
    }

    @Bean
    IntegrationFlow repliesFlow(HeaderChannelRegistry channelRegistry) {
        return IntegrationFlows.from(MessageConsumer.class, gateway -> gateway.beanName("replies"))
                .filter(Message.class,
                        m -> channelRegistry.channelNameToChannel(m.getHeaders().getReplyChannel().toString()) != null,
                        filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true))
                .get();
    }

    public interface UpperCaseGateway {

        String toUpperCase(String payload);

    }

    public interface MessageConsumer extends Consumer<Message<String>> {

    }

}

So, in this case we call UpperCaseGateway.toUpperCase() API which is going to send a request over Spring Cloud Stream binder into a requests destination.

The replies gateway for a one-way MessageConsumer represents a Spring Cloud Stream functional binding with this property:

spring.cloud.stream.bindings.replies-in-0.destination=replies

At this point we correlated replies with their requests and use a built-in replying feature from Spring Integration.

The other side (I call it "server") has to deal with a Message to carry on required correlation headers and musts receive requests from the requests destination and reply to the replies destination.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I'm just seeing this now. Thanks. I'll give this a whirl and report back. – Glenn Thompson Aug 25 '22 at 14:45
  • So far I'm unable to get the example material above working within the application I pushed to github. I'm still trying and I will post and update the project if I get it to work. thanks – Glenn Thompson Aug 26 '22 at 22:20
  • I'm going to accepting Artem's response. I had a couple things obfuscating what I di wrong. I'll push a cleaned up version of my sample app tomorrow if anyone is following along. Thanks Artem. – Glenn Thompson Aug 30 '22 at 23:36
  • I think I’ll push my local sample to official Spring Cloud Stream sample repo. – Artem Bilan Aug 31 '22 at 02:09
  • Sounds good. I updated and pushed to my repo. Feel free to use or critique it. Thanks Again – Glenn Thompson Aug 31 '22 at 14:26