4

I'm trying to understand how to use withTimestampAssigner() inside WatermarkStrategy from Kafka Source. The "time" that I need to be use is inside the message payload.

To do this I have the following code:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
        WatermarkStrategy
        .forMonotonousTimestamps()
                .withTimestampAssigner(Event, Event.time))

DataStream<Event> stream = env.addSource(kafkaData);

Where EventDeserializationSchema() is this:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;
    
    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        
        return TypeInformation.of(Event.class);
    }
}

And Event:

import java.io.Serializable;

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

What I'm trying to understand is how provide the time to withTimeStampAssigner():

.withTimestampAssigner(???))

The variable should be Event.time but from flink page i don't quite get it.

enter image description here

I have been searching

enter image description here

And this confused me a bit because i don't understand if on my case the solution is quite straight forward or I need additional context. All the examples that I found are all with .forBoundedOutOfOrderness() or previous versions of flink where the implementation was different like this one:

kafka flink timestamp Event time and watermark

Thanks!

SimAzz
  • 138
  • 2
  • 14

1 Answers1

5

If the source (e.g., FlinkKafkaConsumer) isn't providing the timestamps you want to work with, then you need to provide a TimestampAssigner. This is a function that takes an event and the previously assigned timestamp (if any) as input, and returns the timestamp. In your case that can look something like this:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);

WatermarkStrategy<Event> wmStrategy = 
        WatermarkStrategy
          .<Event>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.getTime());

DataStream<Event> stream = env.addSource(
        kafkaData.assignTimestampsAndWatermarks(wmStrategy));

(Note: this won't quite work, since your getTime() method returns a String. You'll need to parse the string and return a long -- typically it will be a long representing milliseconds since the epoch.)

The cases involving a TimestampAssignerSupplier.Context or a WatermarkGeneratorSupplier.Context are for situations where you need access to lower-level APIs to do something more custom. That's not necessary in this case.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Hi David, many thanks for the answer. The example help me a lot. The only bit that I'm missing is that the time is indicating when the specific event happen. When you said that I'll need to parse the string to return a long i got that, and I'm try to understand how to do it, but i don't understand the part where you wrote: "typically it will be a long representing milliseconds since the epoch". Could you please elaborate? I quite didn't get this last part. Thanks!! – SimAzz Nov 02 '20 at 14:00
  • I'm thinking about your answer and I now got what you meant. Basically i need to feed flink with the UNIX time. (https://currentmillis.com/) I found the following quite useful: https://stackoverflow.com/questions/7784421/getting-unix-timestamp-from-date The last bit is if getTime() needs to be devided by 1000? – SimAzz Nov 02 '20 at 18:09
  • 1
    Glad you figured it out. For the most part, Flink's event time is unit-less. But some of the helper methods, e.g. for setting window durations, etc. follow the convention that the units of time are milliseconds, so you'll have an easier time of it if you follow that pattern. – David Anderson Nov 03 '20 at 13:32
  • Thanks David, always precious your help! ;-) – SimAzz Nov 03 '20 at 17:09