I have a use case similar to the one mentioned here: Spring Cloud Stream test-binder OutputDestination does not consume events
I need to validate values in messages produced by the application. This was possible in older versions of Spring Cloud Stream Kafka Binder, using MessageCollector
.
According to comments on the stackoverflow post mentioned above, I tried using SubscribableChannel
to store the published/consumed messages in a queue for validation. But this seems to work only for InputChannel
s (captures messages consumed by the application) and not for OutputChannel
s (capture messages produced by the application).
Here's the code from my test app:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>airplane</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>airplane</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2022.0.1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spockframework</groupId>
<artifactId>spock-core</artifactId>
<version>2.4-M1-groovy-4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spockframework</groupId>
<artifactId>spock-spring</artifactId>
<version>2.4-M1-groovy-4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>4.0.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.gmavenplus</groupId>
<artifactId>gmavenplus-plugin</artifactId>
<version>2.0.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compileTests</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
AirplaneApplication.java
@SpringBootApplication
public class AirplaneApplication {
public static void main(String[] args) {
SpringApplication.run(AirplaneApplication.class, args);
}
Logger logger = LoggerFactory.getLogger(AirplaneApplication.class);
@Bean("ArrivalEvent")
public Consumer<Message<ArrivalEvent>> arrivalEventConsumer() {
return msg -> {
logger.info("ArrivalEvent consumed: {}", msg.getPayload());
};
}
@Bean("LandEvent")
public Function<Message<LandEvent>, Message<ArrivalEvent>> landEventProcessor() {
return landEventMessage -> {
ArrivalEvent arrivalEvent = new ArrivalEvent(landEventMessage.getPayload().getFlightId()+"-flight", landEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing ArrivalEvent: {}", arrivalEvent);
return MessageBuilder.withPayload(arrivalEvent)
.setHeader("Type", ArrivalEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "landEventProcessor-out-0")
.build();
};
}
@Bean("FlightEvent")
public Function<Message<FlightEvent>, Message<LandEvent>> flightEventProcessor() {
return flightEventMessage -> {
LandEvent landEvent = new LandEvent(flightEventMessage.getPayload().getFlightId()+"-flight", flightEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing LandEvent: {}", landEvent);
return MessageBuilder.withPayload(landEvent)
.setHeader("Type", LandEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "flightEventProcessor-out-0")
.build();
};
}
@Bean("PlaneEvent")
public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
return planeEventMessage -> {
FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing Flight: {}", flightEvent);
return MessageBuilder.withPayload(flightEvent)
.setHeader("Type", FlightEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "planeEventProcessor-out-0")
.build();
};
}
}
application.properties
spring.application.name=airplane
server.port=8095
spring.cloud.stream.bindings.planeEventProducer-out-0.destination=plane-events
spring.cloud.stream.bindings.planeEventProcessor-out-0.destination=flight-events
spring.cloud.stream.bindings.flightEventProcessor-out-0.destination=land-events
spring.cloud.stream.bindings.landEventProcessor-out-0.destination=arrival-events
spring.cloud.stream.output-bindings=replayProducer;planeEventProducer;planeEventProcessor;flightEventProcessor;landEventProcessor
spring.cloud.stream.bindings.functionRouter-in-0.destination=plane-events,flight-events,land-events,arrival-events
spring.cloud.stream.bindings.functionRouter-in-0.group=airplane
spring.cloud.stream.bindings.functionRouter-in-0.consumer.concurrency=8
spring.cloud.stream.function.definition=functionRouter;PlaneEvent;FlightEvent;LandEvent
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=headers['Type']
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.min-partition-count=8
TestUtil.groovy
class TestUtil {
static Map<String, SubscribableChannel> getInputChannels(InputDestination inputDestination) {
Field chField = ReflectionUtils.findField(InputDestination.class, "channels")
chField.setAccessible(true);
List<SubscribableChannel> channels = chField.get(inputDestination) as List<SubscribableChannel>
return channels.stream().collect(Collectors.toMap(x -> getChannelName(x).replace(".destination", ""), x -> x))
}
static Map<String, SubscribableChannel> getOutputChannels(OutputDestination outputDestination) {
Field chField = ReflectionUtils.findField(OutputDestination.class, "channels")
chField.setAccessible(true);
List<SubscribableChannel> channels = chField.get(outputDestination) as List<SubscribableChannel>
return channels.stream().collect(Collectors.toMap(x -> getChannelName(x).replace(".destination", ""), x -> x))
}
static String getChannelName(SubscribableChannel ch) {
Field chNameField = ReflectionUtils.findField(PublishSubscribeChannel.class, "fullChannelName")
chNameField.setAccessible(true)
try {
return chNameField.get(ch) as String
} catch (IllegalAccessException e) {
System.err.println(e)
return ""
}
}
}
SampleTest.groovy
@SpringBootTest(classes = [AirplaneApplication.class])
@ActiveProfiles("test")
@Stepwise
@Title("Sample Test")
class SampleTest extends Specification {
@Autowired
StreamBridge streamBridge
@Autowired
OutputDestination outputDestination
@Autowired
InputDestination inputDestination
class TestState {
Map<String, SubscribableChannel> inputChannels
Map<String, SubscribableChannel> outputChannels
Map<String, Queue<Object>> inputMessages
Map<String, Queue<Object>> outputMessages
}
TestState testState = new TestState()
def initTestState(InputDestination inDest, OutputDestination outDest) {
def inputChannels = TestUtil.getInputChannels(inDest)
def outputChannels = TestUtil.getOutputChannels(outDest)
getTestState().setInputChannels(inputChannels)
getTestState().setOutputChannels(outputChannels)
getTestState().setInputMessages(new HashMap<String, Queue<Object>>())
getTestState().setOutputMessages(new HashMap<String, Queue<Object>>())
getTestState().getInputChannels().keySet()
.forEach(channelName -> getTestState().getInputMessages().put(channelName, new ConcurrentLinkedQueue<String>()))
getTestState().getInputChannels().entrySet().stream()
.forEach(inputChannelEntry -> {
def subscribableChannel = inputChannelEntry.getValue()
def channelName = inputChannelEntry.getKey()
subscribableChannel.subscribe(m -> { // add debug point 1 here
getTestState().getInputMessages().get(channelName).add(m)
})
})
getTestState().getOutputChannels().keySet()
.forEach(channelName -> getTestState().getOutputMessages().put(channelName, new ConcurrentLinkedQueue<String>()))
getTestState().getOutputChannels().entrySet().stream()
.forEach(outputChannelEntry -> { // add debug point 2 here
def subscribableChannel = outputChannelEntry.getValue()
def channelName = outputChannelEntry.getKey()
subscribableChannel.subscribe(m -> {
getTestState().getOutputMessages().get(channelName).add(m)
})
})
}
def "Sample Test"() {
given: "A plane event is created"
initTestState(inputDestination, outputDestination)
when: "Plane Event is processed"
Message<PlaneEvent> message = MessageBuilder.withPayload(new PlaneEvent(UUID.randomUUID().toString(), "CITY"))
.setHeader("Type", PlaneEvent.class.getSimpleName())
.build()
streamBridge.send("functionRouter-in-0", message)
then: "Flight event is raised"
// def flightEventPayload = getTestState().getOutputMessages().get("flightEventProcessor")
System.out.println() // add debug point 3 here
}
}
As shown in the above snippet (SampleTest.groovy
), I added 3 debug points to check if the OutputChannel
subscribe works, and execution never stopped at debug point 2
. And at debug point 3
the output message queues are empty, but there are messages in the input message queues.
The test itself works, i.e. messages are published and consumed as expected (validated by logging).
2023-04-19T11:16:18.178+05:30 INFO 12084 --- [ main] c.example.airplane.AirplaneApplication : Publishing Flight: FlightEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight, currentAirport=CITY)
2023-04-19T11:16:18.183+05:30 INFO 12084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.184+05:30 INFO 12084 --- [ main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'planeEventProcessor-out-0' destination
2023-04-19T11:16:18.198+05:30 INFO 12084 --- [ main] o.s.c.f.context.config.RoutingFunction : Resolved function from provided [routing-expression] headers['Type']
2023-04-19T11:16:18.199+05:30 INFO 12084 --- [ main] c.example.airplane.AirplaneApplication : Publishing LandEvent: LandEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight, currentAirport=CITY)
2023-04-19T11:16:18.200+05:30 INFO 12084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.201+05:30 INFO 12084 --- [ main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'flightEventProcessor-out-0' destination
2023-04-19T11:16:18.201+05:30 INFO 12084 --- [ main] o.s.c.f.context.config.RoutingFunction : Resolved function from provided [routing-expression] headers['Type']
2023-04-19T11:16:18.204+05:30 INFO 12084 --- [ main] c.example.airplane.AirplaneApplication : Publishing ArrivalEvent: ArrivalEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight-flight, currentAirport=CITY)
2023-04-19T11:16:18.206+05:30 INFO 12084 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-04-19T11:16:18.206+05:30 INFO 12084 --- [ main] figuration$FunctionToDestinationBinder$1 : Output message is sent to 'landEventProcessor-out-0' destination
2023-04-19T11:16:18.207+05:30 INFO 12084 --- [ main] o.s.c.f.context.config.RoutingFunction : Resolved function from provided [routing-expression] headers['Type']
2023-04-19T11:16:18.208+05:30 INFO 12084 --- [ main] c.example.airplane.AirplaneApplication : ArrivalEvent consumed: ArrivalEvent(flightId=556150f2-22d5-42c7-b88f-3a2373d12ea4-flight-flight-flight, currentAirport=CITY)
Am I missing something, or is there a different way of getting it to work?
If it helps, here's my test app: https://github.com/KarthikRIyer/spring-cloud-stream-function-routing
EDIT 1:
I made a few changes after which I started getting some messages in the output channel subscriber:
Instead of creating separate producer bindings for each processor/function I specified destinations for output channels for the processor like so:
# spring.cloud.stream.bindings.planeEventProcessor-out-0.destination=flight-events
spring.cloud.stream.bindings.PlaneEvent-out-0.destination=flight-events
# spring.cloud.stream.bindings.flightEventProcessor-out-0.destination=land-events
spring.cloud.stream.bindings.FlightEvent-out-0.destination=land-events
# spring.cloud.stream.bindings.landEventProcessor-out-0.destination=arrival-events
spring.cloud.stream.bindings.LandEvent-out-0.destination=arrival-events
And used them in spring.cloud.stream.sendto.destination
like so:
@Bean("PlaneEvent")
public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
return planeEventMessage -> {
FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing Flight: {}", flightEvent);
return MessageBuilder.withPayload(flightEvent)
.setHeader("Type", FlightEvent.class.getSimpleName())
// .setHeader("spring.cloud.stream.sendto.destination", "planeEventProcessor-out-0")
.setHeader("spring.cloud.stream.sendto.destination", "PlaneEvent-out-0")
.build();
};
}
At the same time I changed the test to publish the workflow starter message through planeEventProducer-out-0
channel which does not have a bean definition, but is specified in the properties spring.cloud.stream.output-bindings=planeEventProducer;
.
streamBridge.send("planeEventProducer-out-0", message)
This is what I see in the debugger now:
There is no message in the planeEventProducer
channel. Why is that?