0

I have a use case similar to the one mentioned here: Spring Cloud Stream test-binder OutputDestination does not consume events

I need to validate values in messages produced by the application. This was possible in older versions of Spring Cloud Stream Kafka Binder, using MessageCollector.

According to comments on the stackoverflow post mentioned above, I tried using SubscribableChannel to store the published/consumed messages in a queue for validation. But this seems to work only for InputChannels (captures messages consumed by the application) and not for OutputChannels (capture messages produced by the application).

Here's the code from my test app:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>airplane</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>airplane</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2022.0.1</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-binder</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.spockframework</groupId>
            <artifactId>spock-core</artifactId>
            <version>2.4-M1-groovy-4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.spockframework</groupId>
            <artifactId>spock-spring</artifactId>
            <version>2.4-M1-groovy-4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.groovy</groupId>
            <artifactId>groovy-json</artifactId>
            <version>4.0.6</version>
            <scope>compile</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-binder</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <plugin>
                <groupId>org.codehaus.gmavenplus</groupId>
                <artifactId>gmavenplus-plugin</artifactId>
                <version>2.0.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compileTests</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

AirplaneApplication.java

@SpringBootApplication
public class AirplaneApplication {

    public static void main(String[] args) {
        SpringApplication.run(AirplaneApplication.class, args);
    }

    Logger logger = LoggerFactory.getLogger(AirplaneApplication.class);

    @Bean("ArrivalEvent")
    public Consumer<Message<ArrivalEvent>> arrivalEventConsumer() {
        return msg -> {
            logger.info("ArrivalEvent consumed: {}", msg.getPayload());
        };
    }

    @Bean("LandEvent")
    public Function<Message<LandEvent>, Message<ArrivalEvent>> landEventProcessor() {
        return landEventMessage -> {
            ArrivalEvent arrivalEvent = new ArrivalEvent(landEventMessage.getPayload().getFlightId()+"-flight", landEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing ArrivalEvent: {}", arrivalEvent);
            return MessageBuilder.withPayload(arrivalEvent)
                    .setHeader("Type", ArrivalEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "landEventProcessor-out-0")
                    .build();
        };
    }

    @Bean("FlightEvent")
    public Function<Message<FlightEvent>, Message<LandEvent>> flightEventProcessor() {
        return flightEventMessage -> {
            LandEvent landEvent = new LandEvent(flightEventMessage.getPayload().getFlightId()+"-flight", flightEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing LandEvent: {}", landEvent);
            return MessageBuilder.withPayload(landEvent)
                    .setHeader("Type", LandEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "flightEventProcessor-out-0")
                    .build();
        };
    }

    @Bean("PlaneEvent")
    public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
        return planeEventMessage -> {
            FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing Flight: {}", flightEvent);
            return MessageBuilder.withPayload(flightEvent)
                    .setHeader("Type", FlightEvent.class.getSimpleName())
                    .setHeader("spring.cloud.stream.sendto.destination", "planeEventProcessor-out-0")
                    .build();
        };
    }

}

application.properties

spring.application.name=airplane
server.port=8095

spring.cloud.stream.bindings.planeEventProducer-out-0.destination=plane-events
spring.cloud.stream.bindings.planeEventProcessor-out-0.destination=flight-events
spring.cloud.stream.bindings.flightEventProcessor-out-0.destination=land-events
spring.cloud.stream.bindings.landEventProcessor-out-0.destination=arrival-events

spring.cloud.stream.output-bindings=replayProducer;planeEventProducer;planeEventProcessor;flightEventProcessor;landEventProcessor

spring.cloud.stream.bindings.functionRouter-in-0.destination=plane-events,flight-events,land-events,arrival-events
spring.cloud.stream.bindings.functionRouter-in-0.group=airplane
spring.cloud.stream.bindings.functionRouter-in-0.consumer.concurrency=8

spring.cloud.stream.function.definition=functionRouter;PlaneEvent;FlightEvent;LandEvent
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=headers['Type']
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.min-partition-count=8

TestUtil.groovy

class TestUtil {
    static Map<String, SubscribableChannel> getInputChannels(InputDestination inputDestination) {
        Field chField = ReflectionUtils.findField(InputDestination.class, "channels")
        chField.setAccessible(true);
        List<SubscribableChannel> channels = chField.get(inputDestination) as List<SubscribableChannel>
        return channels.stream().collect(Collectors.toMap(x -> getChannelName(x).replace(".destination", ""), x -> x))
    }

    static Map<String, SubscribableChannel> getOutputChannels(OutputDestination outputDestination) {
        Field chField = ReflectionUtils.findField(OutputDestination.class, "channels")
        chField.setAccessible(true);
        List<SubscribableChannel> channels = chField.get(outputDestination) as List<SubscribableChannel>
        return channels.stream().collect(Collectors.toMap(x -> getChannelName(x).replace(".destination", ""), x -> x))
    }

    static String getChannelName(SubscribableChannel ch) {
        Field chNameField = ReflectionUtils.findField(PublishSubscribeChannel.class, "fullChannelName")
        chNameField.setAccessible(true)
        try {
            return chNameField.get(ch) as String
        } catch (IllegalAccessException e) {
            System.err.println(e)
            return ""
        }
    }
}

SampleTest.groovy

@SpringBootTest(classes = [AirplaneApplication.class])
@ActiveProfiles("test")
@Stepwise
@Title("Sample Test")
class SampleTest extends Specification {

    @Autowired
    StreamBridge streamBridge

    @Autowired
    OutputDestination outputDestination

    @Autowired
    InputDestination inputDestination

    class TestState {
        Map<String, SubscribableChannel> inputChannels
        Map<String, SubscribableChannel> outputChannels

        Map<String, Queue<Object>> inputMessages
        Map<String, Queue<Object>> outputMessages
    }

    TestState testState = new TestState()

    def initTestState(InputDestination inDest, OutputDestination outDest) {
        def inputChannels = TestUtil.getInputChannels(inDest)
        def outputChannels = TestUtil.getOutputChannels(outDest)
        getTestState().setInputChannels(inputChannels)
        getTestState().setOutputChannels(outputChannels)
        getTestState().setInputMessages(new HashMap<String, Queue<Object>>())
        getTestState().setOutputMessages(new HashMap<String, Queue<Object>>())

        getTestState().getInputChannels().keySet()
                .forEach(channelName -> getTestState().getInputMessages().put(channelName, new ConcurrentLinkedQueue<String>()))
        getTestState().getInputChannels().entrySet().stream()
                .forEach(inputChannelEntry -> {
                    def subscribableChannel = inputChannelEntry.getValue()
                    def channelName = inputChannelEntry.getKey()
                    subscribableChannel.subscribe(m -> { // add debug point 1 here
                        getTestState().getInputMessages().get(channelName).add(m)
                    })
                })

        getTestState().getOutputChannels().keySet()
                .forEach(channelName -> getTestState().getOutputMessages().put(channelName, new ConcurrentLinkedQueue<String>()))
        getTestState().getOutputChannels().entrySet().stream()
                .forEach(outputChannelEntry -> { // add debug point 2 here
                    def subscribableChannel = outputChannelEntry.getValue()
                    def channelName = outputChannelEntry.getKey()
                    subscribableChannel.subscribe(m -> {
                        getTestState().getOutputMessages().get(channelName).add(m)
                    })
                })
    }

    def "Sample Test"() {
        given: "A plane event is created"
        initTestState(inputDestination, outputDestination)

        when: "Plane Event is processed"
        Message<PlaneEvent> message = MessageBuilder.withPayload(new PlaneEvent(UUID.randomUUID().toString(), "CITY"))
                .setHeader("Type", PlaneEvent.class.getSimpleName())
        .build()
        streamBridge.send("functionRouter-in-0", message)

        then: "Flight event is raised"
//        def flightEventPayload = getTestState().getOutputMessages().get("flightEventProcessor")
        System.out.println() // add debug point 3 here
    }
}

As shown in the above snippet (SampleTest.groovy), I added 3 debug points to check if the OutputChannel subscribe works, and execution never stopped at debug point 2. And at debug point 3 the output message queues are empty, but there are messages in the input message queues.

The test itself works, i.e. messages are published and consumed as expected (validated by logging).

2023-04-19T11:16:18.178+05:30  INFO 12084 --- [           main] c.example.airplane.AirplaneApplication   : Publishing Flight: FlightEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight, currentAirport=CITY)
2023-04-19T11:16:18.183+05:30  INFO 12084 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.184+05:30  INFO 12084 --- [           main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'planeEventProcessor-out-0' destination
2023-04-19T11:16:18.198+05:30  INFO 12084 --- [           main] o.s.c.f.context.config.RoutingFunction   : Resolved function from provided [routing-expression]  headers['Type']
2023-04-19T11:16:18.199+05:30  INFO 12084 --- [           main] c.example.airplane.AirplaneApplication   : Publishing LandEvent: LandEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight, currentAirport=CITY)
2023-04-19T11:16:18.200+05:30  INFO 12084 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.201+05:30  INFO 12084 --- [           main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'flightEventProcessor-out-0' destination
2023-04-19T11:16:18.201+05:30  INFO 12084 --- [           main] o.s.c.f.context.config.RoutingFunction   : Resolved function from provided [routing-expression]  headers['Type']
2023-04-19T11:16:18.204+05:30  INFO 12084 --- [           main] c.example.airplane.AirplaneApplication   : Publishing ArrivalEvent: ArrivalEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight-flight, currentAirport=CITY)
2023-04-19T11:16:18.206+05:30  INFO 12084 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.206+05:30  INFO 12084 --- [           main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'landEventProcessor-out-0' destination
2023-04-19T11:16:18.207+05:30  INFO 12084 --- [           main] o.s.c.f.context.config.RoutingFunction   : Resolved function from provided [routing-expression]  headers['Type']
2023-04-19T11:16:18.208+05:30  INFO 12084 --- [           main] c.example.airplane.AirplaneApplication   : ArrivalEvent consumed: ArrivalEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight-flight, currentAirport=CITY)

Am I missing something, or is there a different way of getting it to work?

If it helps, here's my test app: https://github.com/KarthikRIyer/spring-cloud-stream-function-routing

EDIT 1:

I made a few changes after which I started getting some messages in the output channel subscriber:

Instead of creating separate producer bindings for each processor/function I specified destinations for output channels for the processor like so:

# spring.cloud.stream.bindings.planeEventProcessor-out-0.destination=flight-events
spring.cloud.stream.bindings.PlaneEvent-out-0.destination=flight-events

# spring.cloud.stream.bindings.flightEventProcessor-out-0.destination=land-events
spring.cloud.stream.bindings.FlightEvent-out-0.destination=land-events

# spring.cloud.stream.bindings.landEventProcessor-out-0.destination=arrival-events
spring.cloud.stream.bindings.LandEvent-out-0.destination=arrival-events

And used them in spring.cloud.stream.sendto.destination like so:

@Bean("PlaneEvent")
    public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
        return planeEventMessage -> {
            FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
            logger.info("Publishing Flight: {}", flightEvent);
            return MessageBuilder.withPayload(flightEvent)
                    .setHeader("Type", FlightEvent.class.getSimpleName())
//                  .setHeader("spring.cloud.stream.sendto.destination", "planeEventProcessor-out-0")
                    .setHeader("spring.cloud.stream.sendto.destination", "PlaneEvent-out-0")
                    .build();
        };
    }

At the same time I changed the test to publish the workflow starter message through planeEventProducer-out-0 channel which does not have a bean definition, but is specified in the properties spring.cloud.stream.output-bindings=planeEventProducer;.

streamBridge.send("planeEventProducer-out-0", message)

This is what I see in the debugger now:

enter image description here

There is no message in the planeEventProducer channel. Why is that?

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
Karthik
  • 129
  • 11
  • If you expect your Kafka environment to get involved in the test, that will not work. Test binder strictly works on in-memory-based Spring Integration channels. – sobychacko Apr 19 '23 at 19:07
  • If you intend to write a full integration test, then you might want to look into `EmbeddedKafka` and test that way and use the test binder. – sobychacko Apr 19 '23 at 19:08
  • See this doc section we added recently: https://docs.spring.io/spring-cloud-stream/docs/4.0.3-SNAPSHOT/reference/html/spring-cloud-stream.html#_special_note_on_mixing_test_binder_and_regular_middleware_binder_for_testing – sobychacko Apr 19 '23 at 19:09
  • Yes, I understand that the test binder works on in-memory-based channels, and that is what I want to do. The only problem I'm facing is that I am unable to verify messages produced by the application, as I was able to with MessageCollector in earlier versions of the test binder. In the workaround I tried to implement, I do not see the output channel subscriber getting called. – Karthik Apr 20 '23 at 04:05
  • I do see that you have multiple bindings. `OutputDestination` has `receive(long timeout, String bindingName)` method. I dont see you invoking it. What do you expect? Consider looking at how we're using it - https://github.com/spring-cloud/spring-cloud-stream/blob/main/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java – Oleg Zhurakousky Apr 20 '23 at 13:57
  • `outputDestination.receive()` gives me null. For eg, when the app publishes FlightEvent, flightEventProcessor() will consume this event, and publish LandEvent. When my app has consumers for a published message, I am unable to receive it in the outputDestination. My expectation is that `subscribableChannel.subscribe(m -> { ...` this is called for output channels so that I can store these messages in a queue for validation. – Karthik Apr 21 '23 at 05:58
  • I tried a few more things, and made an edit to the question. Could you please check? – Karthik Apr 21 '23 at 06:20

0 Answers0