2

I am trying to use process function to some processing on a set of events. I am using event time and keyed stream. The issue i am facing is Watermark value is always coming as 9223372036854725808. i have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

So timestamp and extractedTimestamp changing but watermark not getting updated.So no record is getting in queue as context.timestamp is never less than watermark.

DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }

This is the code for processFunction ....

public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {

    private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;

    @Override
    public void open(Configuration config) throws Exception {
        System.out.println("open");
        ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
                "sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
        })
        );
        queueState = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
        Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();

        PriorityQueue<GenericRecord> records = tuple.f1;

    }

    @Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
David Anderson
  • 39,434
  • 4
  • 33
  • 60
Anuj jain
  • 493
  • 1
  • 8
  • 26

1 Answers1

1

Here's a possible explanation for what you've shared:

The TimestampsAndPunctuatedWatermarksOperator calls extractTimestamp before it calls checkAndGetNextWatermark for a given record. This means that the first time the processElement in your MatchFunction is called in each task (parallel instance), the current watermark will be Long.MIN_VALUE (which is -9223372036854775808).

If your parallelism is large enough, that could explain seeing

currentwatermark-----9223372036854775808

several times.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Currently i am running on local with parallelism as 1 only. So ideally its should not happen than and how to fix it. – Anuj jain Mar 02 '20 at 15:44
  • Have you set the time characteristic to event time? – David Anderson Mar 02 '20 at 15:48
  • yes that i have done env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); – Anuj jain Mar 02 '20 at 15:51
  • i have figure the issue but not able to understand why it is happening. I have updated same on user group mail chain. Can you please help me to understand. – Anuj jain Mar 03 '20 at 09:22
  • please help me to resolve the doubts I have on the mail channel. – Anuj jain Mar 04 '20 at 17:40
  • I don't see why checkpointing would be a factor. If you can share a complete, reproducible example (maybe on github), that would help. – David Anderson Mar 04 '20 at 20:22
  • now its working with checkpoint enable also , i am also not sure wat was wrong before have to recheck. Please help me with my other question like how i can collect all the events in a session and trigger after 30 min inactivity . i want to process all the events in queue after 30 min inactivity period of session. – Anuj jain Mar 08 '20 at 06:36
  • https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows – David Anderson Mar 08 '20 at 13:07
  • I have the same confusion, if we don’t set the watermark ourself, would the system automatically setting it when it’s event-time. also with the new 1.14 api, you can set watermark with omitwatermarkxxx(). I see a lot of example try to explain how to use the watermark over the internet, it keeps telling if we have out of order events and we setting watermark base on the max event time stamp we see - maxoutoforderness(which is 3) in this case, like [ 5 2 ]W(3) 6 3 2 1], like 2 here is late event, so will us still – Ricc Jan 30 '22 at 10:27