!!Test environment
A producer who sends a message with the key 'A' once a second.
Kafka Streams (Java Code) tumbling window every 10 seconds.
10 messages per 10 seconds should be received and duplicates occur at regular intervals as shown below.
This must be resolved.
!!Current Results
(timestemp, key, Number of messages per tumbling window)
(1544079700000L, 'A', 10)
(1544079710000L, 'A', 10)
(1544079720000L, 'A', 10)
(1544079730000L, 'A', 9)
(1544079730000L, 'A', 10)
(1544079740000L, 'A', 10)
(1544079750000L, 'A', 10)
(1544079760000L, 'A', 9)
(1544079760000L, 'A', 10)
(1544079770000L, 'A', 10)
(1544079780000L, 'A', 10)
(1544079790000L, 'A', 9)
!! TumblingWindowKafkaStream.java (Use Kafka Streams) `
ackage io.github.timothyrenner.kstreamex.tumblingwindow;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import java.util.Random;
/** Demonstrates tumbling windows.
*
* @author Timothy Renner
*/
public class TumblingWindowKafkaStream {
/** Runs the streams program, writing to the "long-counts-all" topic.
*
* @param args Not used.
*/
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"tumbling-window-kafka-streams");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9090,localhost:9091,localhost:9092");
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<byte[], byte[]> longs = builder.stream(
Serdes.ByteArray(), Serdes.ByteArray(), "t2");
// The tumbling windows will clear every ten seconds.
KTable<Windowed<byte[]>, Long> longCounts =
longs.groupByKey()
.count(TimeWindows.of(10000L)
.until(10000L),
"t2-counts");
// Write to topics.
longCounts.toStream((k,v) -> k.key())
.to(Serdes.ByteArray(),
Serdes.Long(),
"t2-counts-all");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
} // Close main.
} // Close TumblingWindowKafkaStream.
`