3

I'm writing a Kafka Streams App. It does the following steps" 1) consumes the input data 2) dedupe the record based on a new key within 1hr window 3) reselect the key 4) count the key within 1hr window 5) send to downstream.

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

Once I deploy my app with real traffic, it seems like the app keeps sending messages while I thought it will only send out a bunch messages every hour?

Any help is appreciated!!

My config:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000    
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
 stream = inputStream
        .selectKey(... )
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        // only keep the latest event for each customized key
        .reduce((event1, event2) -> event2)
        .toStream()
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        .reduce((event1, event2) -> {
            long count1 = event1.getCount();
            long count2 = event2.getCount();
            event2.setCount(count1 + count2);
            return event2;
        })
        .toStream()
        .to(OUTPUT_TOPIC);
thinktwice
  • 157
  • 1
  • 2
  • 8
  • Based on your question I think this link maybe useful as well https://stackoverflow.com/questions/53946013/how-to-output-result-of-windowed-aggregation-only-when-window-is-finished – aishwarya kumar Jul 01 '19 at 20:54

2 Answers2

2

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

The commit interval has nothing to do with your processing logic.

You might want to look into suppress() operator. Also the following block post might help:

Kafka Streams' processing model is continuous and it send continuous result updates by default. That's why you get basically one output message per input message, because processing an input message modifies the result.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Hi, without `suppress()`, is there any other way to only send one message per key per timewindow? I dont know if my cluster is upgraded to the newer version which supports `suppress()` function. Thanks a lot! – thinktwice Jul 08 '19 at 16:23
  • You can upgrade Kafka Streams independently of the Kafka cluster. Both are forward/backward compatibility: https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility -- Besides this, you can always write a custom processor. – Matthias J. Sax Jul 08 '19 at 17:35
  • Thanks Matthias! I have tried with `suppress` but keeps getting tons of `Unsubscribed all topics or patterns and assigned partitions`. I searched for the output but didn't find any good answer to that. Do you have any insight about what might be wrong? my code is `groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(60))).reduce(...).suppress(untilWindonCloses(Suppressed.BufferConfig.unbounded()))` Thanks a lot!! – thinktwice Jul 08 '19 at 21:45
  • `Unsubscribed all topics or patterns and assigned partitions` -- this should be logged by the restore consumer (compare the log line it should contain `-restore` and should not be an issue). The only thing you should do, is to specify the "grace period" on the `TimeWindow` -- otherwise, it would default to 24h and you don't get any result earlier than that... – Matthias J. Sax Jul 09 '19 at 02:03
  • Thanks! By adding "grace period", my app is able to generate correct messages! Just one more question, does `suppress` cache all internal messages in memory? Or it only keeps the latest one? Since I need to work with 1hr time window ~2k message per sec, I'm worried about the memory usage. Thanks! – thinktwice Jul 09 '19 at 18:30
  • I also found that `suppress` provides `spillToDiskWhenFull ` config to address the memory concern, but seems that `spillToDiskWhenFull ` is not released yet? – thinktwice Jul 09 '19 at 21:49
  • `suppress()` maintains only one record per key per window. Hence, the size of the window does not matter, only how many windows start within the grace period. -- And yes, "spill to disk" is not implemented yet. – Matthias J. Sax Jul 10 '19 at 05:57
  • My kafka streams apps will be killed by the system due to memory issue. `SIGNALKILL9`. I used to have only 2 instances and now I added another 4 instances to it. From the log, it seems like each instance is consuming all partitions which seems wrong to me(they all have the same application id, only restore-consumer has `group.ip=null`). I'm trying to print partition assignment for each instance. Does Kafka streams provide any support to that? like kafka's `kafkaConsumer.assignment()`. Thanks a lot!! – thinktwice Jul 11 '19 at 20:12
  • You can use `KafkaStreams#localTheadMetadata` to get information what partitions are assigned. – Matthias J. Sax Jul 12 '19 at 19:54
  • Thanks Matthias! `KafkaStreams#localTheadMetadata` is exactly what I'm looking for!! A follow up question on `windowedBy` and `suppress`: I know `suppress` only keeps the latest record per key per window, but does `windowedBy` keeps all records during the same time window in memory? My app is like `groupByKey(...).windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ofSeconds(15))).reduce(...).suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))` From the log, it will have memory issues after 2~3 days. Thanks! – thinktwice Jul 15 '19 at 22:40
  • `windowedBy()` is just a logical operation and the corresponding `reduce()` keeps only the current result per key per window (not each input record). – Matthias J. Sax Jul 15 '19 at 22:48
  • I see. So basically `windowedBy` should not lead to any memory issues? – thinktwice Jul 16 '19 at 02:16
  • The operator itself not -- it's similar to `groupBy()`. Only grace period and retention time affect your storage requirements (`reduce()` itself uses RocksDB by default and spills to disk -- `suppress()` is in-memory only atm). – Matthias J. Sax Jul 16 '19 at 18:14
  • Thanks a lot Matthias! My app is running as expected but getting rebalanced pretty often. I'm not sure what's the right approach to test the configs so I post another question. Do you mind take a look of this question? https://stackoverflow.com/questions/57100510/kafka-streams-app-is-always-rebalancing-and-getting-error-the-coordinator-is-no Thanks!! – thinktwice Jul 18 '19 at 18:20
-1
  1. I would recommend you to use exactly-once guarantee provided by the recent version of kafka. Using it you won't have to worry about de-duplicating messages. https://www.baeldung.com/kafka-exactly-once
  2. Configure producer config: specifically buffer.memory & linger.ms. (You can also check batch.size) (Check https://kafka.apache.org/documentation/#producerconfigs for more information)
JR ibkr
  • 869
  • 7
  • 24
  • This is not really an answer to the question. – Matthias J. Sax Jul 05 '19 at 21:12
  • Writing an app just to dedup a topic, sounds repetitive to me. That's why I recommended to use exactly-once-delivery. And I agree, second point was not helpful at all. I will edit my answer. – JR ibkr Jul 05 '19 at 21:30
  • The problem in the question is not about duplicates in the input, but it's a question about Kafka Streams' processing model that by default emits more than one record per window. – Matthias J. Sax Jul 05 '19 at 21:32