There are several details about the example Micronaut/Kafka Streams application which I don't understand. Here is the example class from the documentation (original link: https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStreams).
My questions are:
- Why are we returning only the source stream?
- If we have multiple source
KStream
objects, EG to do a join, do we need to also make them Beans? - Do we also need make each source
KTable
a Bean? - What happens if we don't make a source
KStream
orKTable
a Bean? We currently have at least one project that does this but with no apparent problems.
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
@Factory
public class WordCountStream {
public static final String STREAM_WORD_COUNT = "word-count";
public static final String INPUT = "streams-plaintext-input";
public static final String OUTPUT = "streams-wordcount-output";
public static final String WORD_COUNT_STORE = "word-count-store";
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) {
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStream<String, String> source = builder
.stream(INPUT);
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as(WORD_COUNT_STORE));
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
return source;
}
}
Edit: Here's a version of our service with multiple streams, edited to remove identifying info.
@Factory
public class TopologyCopy {
private static class DataOut {}
private static class DataInOne {}
private static class DataInTwo {}
private static class DataInThree {}
@Singleton
@Named("data")
KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
Materialized.as("data-three-store"));
KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
.groupByKey()
.aggregate(() -> null, (key, device, storedDevice) -> device,
Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
dataOneStream
.transformValues(() -> /* MAGIC */))
.join(dataTwoTable, (data1, data2) -> /* MAGIC */)
.selectKey((something, msg) -> /* MAGIC */)
.to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));
return dataOneStream;
}
}