1

There are several details about the example Micronaut/Kafka Streams application which I don't understand. Here is the example class from the documentation (original link: https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStreams).

My questions are:

  • Why are we returning only the source stream?
  • If we have multiple source KStream objects, EG to do a join, do we need to also make them Beans?
  • Do we also need make each source KTable a Bean?
  • What happens if we don't make a source KStream or KTable a Bean? We currently have at least one project that does this but with no apparent problems.
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

@Factory
public class WordCountStream {

    public static final String STREAM_WORD_COUNT = "word-count";
    public static final String INPUT = "streams-plaintext-input"; 
    public static final String OUTPUT = "streams-wordcount-output"; 
    public static final String WORD_COUNT_STORE = "word-count-store";


    @Singleton
    @Named(STREAM_WORD_COUNT)
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { 
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStream<String, String> source = builder
                .stream(INPUT);

        KTable<String, Long> groupedByWord = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
                //Store the result in a store for lookup later
                .count(Materialized.as(WORD_COUNT_STORE)); 

        groupedByWord
                //convert to stream
                .toStream()
                //send to output using specific serdes
                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));

        return source;
    }

}

Edit: Here's a version of our service with multiple streams, edited to remove identifying info.

@Factory
public class TopologyCopy {
    private static class DataOut {}
    private static class DataInOne {}
    private static class DataInTwo {}
    private static class DataInThree {}

    @Singleton
    @Named("data")
    KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
        Properties props = builder.getConfiguration();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

        KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
        KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
        GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
                Materialized.as("data-three-store"));
        KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
                .groupByKey()
                .aggregate(() -> null, (key, device, storedDevice) -> device,
                        Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));

        dataOneStream
                .transformValues(() -> /* MAGIC */))
                .join(dataTwoTable, (data1, data2) -> /* MAGIC */)
                .selectKey((something, msg) ->  /* MAGIC */)
                .to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));

        return dataOneStream;
    }

}

Sparky
  • 2,694
  • 3
  • 21
  • 31
  • I dont think it matters how many are used in the method, but the `@Singleton` for the `Kstream` is indeed important for the lifecycle of the variable – OneCricketeer Nov 19 '20 at 16:56
  • So the reason I'm asking is that there's actually no reason I can see to return a `KStream` that wouldn't be served better by a `@PostConstruct` with the right context. I've done some messing around that seems to indicate that anything neccesary to run the streams is set in the `ConfiguredStreamBuilder` (at least when running against the `TopologyTestDriver`, not in micronaut itself), so because the returned KStream is never used anywhere obvious it seems misleading to return it. Only it _does_ get used somewhere in the micronaut/kafka source. I just blew my stack before I worked out why. – Sparky Nov 19 '20 at 18:20
  • I've never used Micronaut, so not sure on that. I'm just saying that the singleton method does get ran, so all streams/tables defined within get executed, however, if you needed to reference those local variables somewhere else, then moving them to their own singleton methods would make sense – OneCricketeer Nov 19 '20 at 21:19

0 Answers0