Flink: 1.13.2
I'm having a StreamTableEnvironment tableEnv
that read streaming data from a KafkaSource.
From this tableEnv, I filter my data and transform it back to a DataStream.
DataStream<Row> myStreamData = env.fromSource(source, WatermarkStrategy.noWatermarks(),"mySource");
tableEnv.createTemporaryView("myTable", myStreamData);
Table mytable = tableEnv.sqlQuery("select source_timestamp, id from mytable");
DataStream<Row> filteredDatastream = tableEnv.toChangelogStream(myTable);
The type for source_timestamp is java.sql.Timestamp and id is String.
Later on, I access my data from filteredDatastream
and face the issue where row.getFieldsAs(source_timestamp)
returns a LocalDateTime instead of a Timestamp.
System.out.println(filteredDatastream.getTransformation().getOutputType());
/*
Out: (
`source_timestamp` TIMESTAMP(9), // Is a TIMESTAMP
`trip_id` STRING
)
*/
filteredDatastream.map(
row -> {
System.out.println(row.getFieldAs("source_timestamp").getClass());
/*
Out: class java.time.LocalDateTime // (not java.sql.Timestamp)
*/
return row;
}
);
How can I get row.getFieldAs("source_timestamp")
to be a java.sql.Timestamp
instead of java.time.LocalDateTime
without having to cast afterward? Knowing that java.sql.Timestamp
seems to be handled by flink data-type-extraction.