0

I have the following processor bean method signature:

@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
    return (inputStream1, intputStream2) -> {

        intputStream2
            .peek((k, v) -> {
                log.debug(...);
            });

        return inputStream1
            .mapValues(...)
            .branch((k,v) -> true, (k,v) -> true);

    };
}

The relevant properties:

spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
  myStream-in-0:
    destination: inp0
  myStream-in-1:
    destination: inp1
  myStream-out-0:
    destination: out0
  myStream-out-1:
    destination: out1

Spring Cloud Kafka Stream version Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE), embedded Kafka version 2.5.0.

I am testing my topology using embedded Kafka:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
            "inp0", "inp1", "out0", "out1"
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

    @Test
    public void embeddedKafkaTest() throws IOException, InterruptedException {
        Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
        Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");

        this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
        this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");

        latch = new CountDownLatch(1);
        // ... publish ...
        latch.await(15, TimeUnit.SECONDS);

        ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
        assertThat(out0.count(), is(greaterThanOrEqualTo(1)));

        ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
        assertThat(out1.count(), is(greaterThanOrEqualTo(1)));

    }

private <K,V> Consumer<K, V> createConsumer(String groupName) {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}

My tests show that the messages from myStream reach and land in the topic "out0" as expected, but "out1" topic remains empty and the unit test fails on the second assertion.

I've tried a couple of things, but it looks like the output to the second output topic is simply not being produced (the output to the first output topic is produced well).

Can you see any mistakes in my setup?

And one more thing: the return statement in the myStream bean method definition shows a compiler warning:

Unchecked generics array creation for varargs parameter

But it looks like that's how the Spring Cloud Kafka Stream 3.x API requires the return type to be defined?

Sergey Shcherbakov
  • 4,534
  • 4
  • 40
  • 65

1 Answers1

1

You are passing two predicates to the branch method and both of them always evaluate to true. The first predicate always wins and produces data to the first output binding. The branch method invocation exits after the first predicate evaluate to true. See the javadoc for more details. You should use different predicates (possibly checking certain conditions on key/value). If the first predicate fails and the second one succeeds, then you will see data produced to the second output topic.

With respect to that compiler warning, I think you can safely ignore that as the API will ensure that the predicate objects passed into the branch invocation will have proper type. Since the implementation of the method uses generic varargs, you get that exception. See this thread for details on that compiler warning.

sobychacko
  • 5,099
  • 15
  • 26
  • Thank you! that explains the issue. Can you please also recommend an approach if I want to copy output to two different topics? – Sergey Shcherbakov Jul 09 '20 at 22:46
  • 1
    It seems that I've got it by myself by simply creating an array of two KStreams manually: return new KStream[] { resKStream, resKStream }; – Sergey Shcherbakov Jul 09 '20 at 22:50
  • 1
    Currently, the binder doesn't support that, so a solution might be to make the bean a `BiConsumer` and you programmatically invoke the `to` method. With that said, please elaborate your usecase (maybe on binder Github repo), and we could potentially think about some out of the box solutions from the binder. – sobychacko Jul 09 '20 at 22:51