4

Flink: 1.13.2

I'm having a StreamTableEnvironment tableEnv that read streaming data from a KafkaSource. From this tableEnv, I filter my data and transform it back to a DataStream.

DataStream<Row> myStreamData = env.fromSource(source, WatermarkStrategy.noWatermarks(),"mySource");

tableEnv.createTemporaryView("myTable", myStreamData);
Table mytable = tableEnv.sqlQuery("select source_timestamp, id from mytable");
DataStream<Row> filteredDatastream = tableEnv.toChangelogStream(myTable);

The type for source_timestamp is java.sql.Timestamp and id is String.

Later on, I access my data from filteredDatastream and face the issue where row.getFieldsAs(source_timestamp) returns a LocalDateTime instead of a Timestamp.

System.out.println(filteredDatastream.getTransformation().getOutputType());
/* 
Out: (
  `source_timestamp` TIMESTAMP(9), // Is a TIMESTAMP 
  `trip_id` STRING
)
*/

filteredDatastream.map(
        row -> {
            System.out.println(row.getFieldAs("source_timestamp").getClass());
            /*
            Out: class java.time.LocalDateTime // (not java.sql.Timestamp)
            */
            return row;
        }
);

How can I get row.getFieldAs("source_timestamp") to be a java.sql.Timestamp instead of java.time.LocalDateTime without having to cast afterward? Knowing that java.sql.Timestamp seems to be handled by flink data-type-extraction.

  • 1
    Is there a reason you want `java.sql.Timestamp`? That's a terrible class to use compared to `LocalDateTime`. – Kayaman Mar 22 '22 at 08:45
  • It's because I would like to sink my timestamps on a JDBCSink and inside my JdbcStatementBuilder, I need to cast my `source_timestamp` to java.sql.timestamp – Benjamin Rémiche Mar 22 '22 at 12:06
  • If you mean that you want to use `setTimestamp()`, you can also use `setObject()` with `LocalDateTime`. So you really don't **need** the `Timestamp`. – Kayaman Mar 22 '22 at 12:38

0 Answers0