0

I'm encountering similar issue to Flink EventTime Processing Watermark is always coming as -9223372036854725808 However, the suggested solutions (set parallelism and disable checkpointing) do not have any effect. In this example, I'm simply streaming 1000 events 1 second apart, and then comparing the event timestamp to ctx.timerService().currentWatermark()

>>> v=(61538659200000,0), watermark=-9223372036854775808
>>> v=(61538659201000,1), watermark=-9223372036854775808
>>> v=(61538660198000,998), watermark=-9223372036854775808
>>> v=(61538660199000,999), watermark=-9223372036854775808


public void watermarks()
    throws Exception
{
    final var env = StreamExecutionEnvironment.createLocalEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setMaxParallelism(1);

    final long startMs = new Date(2020, 1, 1).getTime();
    final var events = new ArrayList<Tuple2<Long, Integer>>();
    for (var ii = 0; ii < 1000; ++ii ) {
        events.add(new Tuple2<Long, Integer>(startMs + ii * 1000, ii));
    }

    env.fromCollection(events)
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.<Tuple2<Long, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner((event, ts) -> event.f0))
        .setParallelism(1)
        .keyBy(row -> row.f1 % 2)
        .process(new ProcessFunction<Tuple2<Long, Integer>, String>()
        {
            @Override
            public void processElement(
                final Tuple2<Long, Integer> value,
                final Context ctx,
                final Collector<String> out)
                throws Exception
            {
                out.collect("v=" + value + ", watermark=" + ctx.timerService().currentWatermark());
            }
        })
        .setParallelism(1)
        .print()
        .setParallelism(1);
    final var result = env.execute();
    System.out.println(result);
}
flink182
  • 1
  • 1

2 Answers2

0

forMonotonousTimestamps is a periodic watermark generator that only generates watermarks when triggered by a timer. By default this timer fires every 200 msec (this is the autoWatermarkInterval). Your job doesn't run long enough for this timer to fire.

Bounded sources do generate a watermark with its timestamp set to MAX_WATERMARK when they reach the end of their input -- just before shutting down the job. You're not seeing this watermark in the output from your job because there are no events that follow it.

If you want to generate watermarks with every event, you can implement a custom watermark strategy that emits a watermarks in the onEvent method of the WatermarkGenerator (docs). This is usually a bad idea in production, as you'll waste CPU cycles and network bandwidth on these extra watermarks, but sometimes for testing this is helpful.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Ah, thanks! What would be the performance implications of increasing the timer frequency to be on every event (or 'as fast as possible')? – flink182 Aug 11 '21 at 14:11
  • I tried adding a "Thread.sleep(100)" to the process function, but still dont see the watermarks moving forward. – flink182 Aug 11 '21 at 14:15
  • Not surprised. You'd have to put a sleep in the source to see any effect. The source (fromCollection in this case) and watermarking are probably running to completion before the process function receives its first event. You could experiment with setting the network buffer timeout to 0 -- that should be enough to change the behavior, but once the process function sleeps, I'm not sure what will happen next. – David Anderson Aug 11 '21 at 14:56
0

According to source code comments:

/**
 * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
 * created {@link WatermarkGenerator}.
 *
 * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
 * stream for that amount of time, then that partition is considered "idle" and will not hold
 * back the progress of watermarks in downstream operators.
 *
 * <p>Idleness can be important if some partitions have little data and might not have events
 * during some periods. Without idleness, these streams can stall the overall event time
 * progress of the application.
 */
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) ...

So, You can try to use WatermarkStrategy.forMonotonousTimestamps.withIdleness(...)

pkyo
  • 1
  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Jul 04 '22 at 13:24