5

My intention with this topology is to window incoming messages and then count them, and then send the count to another topic.

When I test this with a single key and one or more values to the input topic, I get inconsistent results. Sometimes the count is correct. Sometimes I will send in a single message, see the single message at the first peek and instead of getting a count of 1, I get some other value at the second peek and in the output topic. When I send in multiple messages, the count is usually right, but sometimes off. I'm careful to send the messages inside the time window, so I don't think they're getting split into two windows.

Is there a flaw in my topology?

    public static final String INPUT_TOPIC  = "test-topic";
    public static final String OUTPUT_TOPIC = "test-output-topic";

    public static void buildTopo(StreamsBuilder builder) {
        WindowBytesStoreSupplier store = Stores.persistentTimestampedWindowStore(
                "my-state-store",
                Duration.ofDays(1),
                Duration.ofMinutes(1),
                false);

        Materialized<String, Long, WindowStore<Bytes, byte[]>> materialized = Materialized
                .<String, Long>as(store)
                .withKeySerde(Serdes.String());

        Suppressed<Windowed> suppression = Suppressed
                .untilWindowCloses(Suppressed.BufferConfig.unbounded());

        TimeWindows window = TimeWindows
                .of(Duration.ofMinutes(1))
                .grace(Duration.ofSeconds(0));

        // windowedKey has a string plus the kafka time window
        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .peek((key, value) -> System.out.println("****key = " + key + " value= " + value))
                .groupByKey()
                .windowedBy(window)
                .count(materialized)
                .suppress(suppression)
                .toStream()
                .peek((key, value) -> System.out.println("key = " + key + " value= " + value))
                .map((key, value) -> new KeyValue<>(key.key(), value))
                .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }
marathon
  • 7,881
  • 17
  • 74
  • 137

0 Answers0