4

I am investigating using Flink with a Kinesis stream as a source. I would like to use Event Time watermarking. Planning on running this on AWS managed Flink (Kinesis Analytics) platform.

Looking at the AWS documentation and indeed Flink documentation it is recommended to use the FlinkKinesisConsumer.
To enable EventTime on this consumer I see that the recommendation is to use a custom AssignerWithPeriodicWatermarks() and set it on the KinesisConsumer with setPeriodicWatermarkAssigner.

However, I also read on the Flink documentation that this API is deprecated and it advised to use WatermarkStrategies.

My questions:

  • is it possible to use the WatermarkStrategy on the kinesis consumer or must it be applied after a non-source operation on the DataStream itself (discouraged in flink docs)?
  • if not possible and must be used after a non-source operation what does this mean? Why is it discouraged? how does it will performance of the workload
  • Or is it recommended to continue to use a deprecated API?
  • or is there another kinesis flink consumer than can be recommended

Thanks in advance for any suggestions

Alexis

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
Bombington
  • 83
  • 4
  • The `WatermarkStrategy` contains both the `TimestampAssigner` and the `WatermarkGenerator` and they are working in hand and hand. So, where did you read that the assigner API is deprecated? – Peter Csala May 09 '22 at 14:03
  • Hi @PeterCsala - it's from the flink docs at the bottom of this page: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/ The Kinesis consumer takes an AssignerWithPeriodicWatermarks - this is deprecated – Bombington May 10 '22 at 07:09
  • Should also add that kinesis docs also stipulate adding the method `setStreamTimeCharacteristic()` on the StreamExecutionEnvironment - which is also deprecated. – Bombington May 10 '22 at 07:12
  • Which version of Flink are you using in KDA? 1.11? – Peter Csala May 10 '22 at 07:27
  • The latest version - 1.13 The same methods and docs show these things deprecated in flink 1.13 – Bombington May 10 '22 at 07:51
  • [Before 1.12 the default](https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html#introduction-to-time-attributes) TimeCharacteristic was ProcessingTime. In [1.12](https://flink.apache.org/news/2020/12/10/release-1.12.0.html#important-changes) this has been changed to EventTime so that's why the `setStreamTimeCharacteristic` become deprecated. – Peter Csala May 10 '22 at 07:55
  • 1
    Ah cool. So that explains that. But then the questions still remains - how to implement a Watermark strategy on a Kinesis consumer without using deprecated methods? – Bombington May 10 '22 at 08:03
  • As far as I can see from the [github source code](https://github.com/awslabs/amazon-kinesis-connector-flink) of the connector it is compatible with 1.11. The [`FlinkKinesisConsumer`](https://github.com/awslabs/amazon-kinesis-connector-flink/blob/master/amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisConsumer.java#L259) does not expose a method which anticipates a WatermarkingStrategy . So I think it is fine to use the `setPeriodicWatermarkAssigner` method. – Peter Csala May 10 '22 at 08:15
  • Yeah. But I think it then also relies (according to docs and examples on Flink github) with a `setStreamTimeCharacteristic`. So to use a Watermark strategy with a kinesis consumer I believe the only way is to not set it on the source itself but on the stream and let Flink create a new stream from this and apply the Watermark strategy there. Thanks for all your help here @PeterCsala – Bombington May 10 '22 at 08:21
  • 1
    Yepp, calling the `assignTimestampsAndWatermarks` on a `DataStream` is also viable option. – Peter Csala May 10 '22 at 08:25

1 Answers1

0

It's not possible to set it directly on FlinkKinesisConsumer (excepting, as you pointed out, by using the deprecated AssignerWithPeriodicWatermarks interface) nor any other implementation of SourceFunction, but you can set the watermarks as soon as you get the datastream.

Judging from the comments I think you've already figured it out, but this is what I'm doing.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

// Set up
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var consumerProperties = ...;
var deserializer = ...;
var kinesisConsumer = new FlinkKinesisConsumer<MyDataType>(kinesisStreamName, deserializer, consumerProperties);

// Pick a watermark strategy.
// Use some timestamp field from your event object.
var strategy = WatermarkStrategy.forMonotonousTimestamps<MyDataType>()
                                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Start reading from Kinesis
var dataStream = env.addSource(kinesisConsumer)
                    .assignTimestampsAndWatermarks(strategy);

You'll have to tweak this to your use case. Obviously replace MyDataType with whatever is actually in your stream, but also note that forMonotonousTimestamps may not be suited to your use case, and forBoundedOutOfOrderness may work better.

Jake
  • 321
  • 3
  • 12