0

What are the ways to dedup the key in the windowed k-table?

Added transform based on the solutions provided in the other threads,but still seeing the same count.Can somebody tell whats wrong in this?

final KTable<Windowed<String>, Long> aggregated = feeds
            .selectKey((k, v) -> v.getUserId().toString())
            .transform(() -> new Transformer<String, AvroMessage, KeyValue<String, AvroMessage>>() {
                private ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, AvroMessage> transform(String key, AvroMessage value) {

                    Collection<String> list = Arrays.asList(key);

                    // Get collection without duplicate i.e. distinct only
                    List<String> distinctElements = list.stream().distinct().collect(Collectors.toList());

                    key = distinctElements.toString();
                    // transform value using timestamp
                    return new KeyValue<>(key, value);
                }

                @Override
                public KeyValue<String, AvroMessage> punctuate(long timestamp) {
                    return null;
                }

                @Override
                public void close() {
                }
            })
            // no need to specify explicit serdes because the resulting key and value types match our default serde settings
            .groupByKey()
            .count(TimeWindows.of(windowSizeMs),STATE_STORE);

output:

[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (2<-null)

Expected

[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (1<-null)
user8617180
  • 267
  • 6
  • 20
  • Possible duplicate of [How to send final kafka-streams aggregation result of a time windowed KTable?](https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable) – Matthias J. Sax Feb 09 '18 at 01:37
  • Another duplicate: https://stackoverflow.com/questions/44921281/how-to-write-only-final-output-of-kstreams-windowed-operation – Matthias J. Sax Feb 09 '18 at 01:38

0 Answers0