0

I have a windowed KTable working as expected, but it is outputting every time a new value is received. I found the .suppress operator and it does exactly what I want: output results only at the end of the time window. I have added a grace value to my TimeWindow, but cannot get .suppress to work with the windowed KTable. The answer to this question shows what .suppress should look like.

It appears to me when reading Apache's documentation, that untilWindowCloses is a method of the Suppressed interface, meaning I can't instantiate a Suppressed object, correct? I'm not sure how to implement an interface in this way (in the arguments for .suppress on a windowed KTable).

I feel like I'm missing something stupid, but I've searched and searched and can't figure it out. Any ideas?

TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));

final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
    .windowedBy(window)
    .reduce(new Reducer<GenericRecord>() {
        @Override
        public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
            //reduce code
        }
    })
    .suppress(Suppressed.untilWindowCloses(unbounded())); //need help here

I am using Eclipse and it's telling me "The method unbounded() is undefined." What am I doing wrong?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
stufio
  • 3
  • 4
  • 1
    Note that there are bugs in `suppress()` in 2.1.x and 2.2.x and you should use `2.3.0` if you use `suppress()`. – Matthias J. Sax Sep 10 '19 at 14:51
  • Can you expound on what those are? I’m using Confluent’s platform on version 5.1.2. Not sure what version of KStreams that corresponds to. – stufio Sep 10 '19 at 15:33
  • That's AK 2.1.1 -- you should use CP 5.3.0. (cf https://docs.confluent.io/current/streams/upgrade-guide.html) – Matthias J. Sax Sep 10 '19 at 16:06

1 Answers1

2

You need to either statically import unbounded(), or qualify the reference.

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

or

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
jamierocks
  • 685
  • 5
  • 16
  • Thank you for your response. I tried the second method and I get `References to interface static methods are allowed only at source level 1.8 or above`. I looked that up and it says that you need JDK 8. When I do `java -version` I get `openjdk version "1.8.0_191"`. What gives? – stufio Sep 09 '19 at 20:03
  • You'll need to find the option in Eclipse to set your source level appropriately. I haven't used Eclipse in some years, buts its available under Project Structure in IntelliJ. https://i.imgur.com/6e0yauW.png – jamierocks Sep 09 '19 at 20:09
  • Thanks for your help. I think I found it under Project > Properties > Java Compiler > Compiler compliance level. If I were just running this from the command line, would it use whatever JDK I had installed? – stufio Sep 09 '19 at 21:58