2

Trying to implement a Flink job for reading Kafka stream and aggregating the session, for some reason getResult() is not being called. I see createAccumulator() and add() were called, I'm expecting getResult() also be called so that I can sink aggregated message in destination.

        source.keyBy(new KeySelector<GenericRecord, String>() {
                    @Override
                    public String getKey(GenericRecord record) {
                        return record.get("id").toString();
                    }})
                .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
                    private static final long serialVersionUID = -4834111073247835189L;
                    private final long maxTimeLag = 300000L;

                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
                        return new Watermark(extractedTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
                        long ts = 1000 * (long)element.get(("timestamp"));
                        return (ts);
                    }
                })
                .map(new ReduceAttributesMap())
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> e) {
                        return e.f0;
                    }
                })
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .aggregate(new EventAggregation())
                .addSink(...)

What could be the issue? did I misconfigure something? Appreciate your help!

Sparkle8
  • 225
  • 3
  • 13

1 Answers1

1

AggregateFunction#getResult() is only called when the window is finalized. In your case, the window is only emitted, when there are no events for a specific key after 5 minutes. Can you confirm in your data that this case is actually happening?

You can try to reduce the gap time of the session window to see it more easily. Furthermore, your watermark assigner looks suspicious. You probably want to use BoundedOutOfOrdernessTimestampExtractor. Lastly can you double check that your time extraction is working as expected? Is the timestamp stored as seconds since 1970?

Arvid Heise
  • 3,524
  • 5
  • 11
  • Arvid, I've reduced the gap time of session window to 1 minute still AggregateFunction#getResult() is not called. I get events from source for a given key after ~15 minutes. – Sparkle8 Dec 01 '19 at 20:27
  • My requirement is, video watch events come every ~ 15m I need to aggregate as soon as I receive and post it to destination.For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event I need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). That is the reason I want to keep window gap time to higher value for aggregation and at the same time when event occurred I should expect getResult() return the aggregated value(AggregateFunction#add()). – Sparkle8 Dec 01 '19 at 20:44
  • Did you double-check that your watermark assignment is working as expected? See the second half of my post. – Arvid Heise Dec 02 '19 at 07:11
  • Hi Arvid, watermark creation looks good. But getResult() is being called only when window time is completed. What I'm looking here is, whenever new element is added to window I want to emit post aggregated value so that I can write it to sink. Other than getResult() is there any other option to emit aggregated value on new event comes? Thanks for looking into it, Arvid! – Sparkle8 Dec 02 '19 at 20:12
  • I'd don't quite understand. Do you want to emit intermediate results? Why do you need the window at all? When would you want to reset results? – Arvid Heise Dec 03 '19 at 20:47
  • Yes, I want to emit intermediate results whenever a new message arrive in a window. Need window because I want to aggregate data in a session, and reset results when gap between windows is greater than x minutes. I figured out the solution using custom EventTimeTrigger, yet to test with real session data. – Sparkle8 Dec 05 '19 at 06:21
  • Yes you need to set a [trigger](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers). I'd try out `CountTrigger` with a `maxCount` of 1. – Arvid Heise Dec 07 '19 at 18:54