Trying to implement a Flink job for reading Kafka stream and aggregating the session, for some reason getResult() is not being called. I see createAccumulator() and add() were called, I'm expecting getResult() also be called so that I can sink aggregated message in destination.
source.keyBy(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord record) {
return record.get("id").toString();
}})
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
private static final long serialVersionUID = -4834111073247835189L;
private final long maxTimeLag = 300000L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - maxTimeLag);
}
@Override
public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
long ts = 1000 * (long)element.get(("timestamp"));
return (ts);
}
})
.map(new ReduceAttributesMap())
.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> e) {
return e.f0;
}
})
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.aggregate(new EventAggregation())
.addSink(...)
What could be the issue? did I misconfigure something? Appreciate your help!