I have the following processor bean method signature:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
The relevant properties:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream version Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE), embedded Kafka version 2.5.0.
I am testing my topology using embedded Kafka:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
My tests show that the messages from myStream
reach and land in the topic "out0" as expected, but "out1" topic remains empty and the unit test fails on the second assertion.
I've tried a couple of things, but it looks like the output to the second output topic is simply not being produced (the output to the first output topic is produced well).
Can you see any mistakes in my setup?
And one more thing: the return statement in the myStream bean method definition shows a compiler warning:
Unchecked generics array creation for varargs parameter
But it looks like that's how the Spring Cloud Kafka Stream 3.x API requires the return type to be defined?