2

I am using flink 1.13. I am trying to convert table results to the datastream in a following way but keep getting error.

public class HybridTrial {
  public static class Address {
    public String street;
    public String houseNumber;

    public Address() {}

    public Address(String street, String houseNumber) {
      this.street = street;
      this.houseNumber = houseNumber;
    }
  }

  public static class User {
    public String name;

    public Integer score;

    public LocalDateTime event_time;

    public Address address;

    // default constructor for DataStream API
    public User() {}

    // fully assigning constructor for Table API
    public User(String name, Integer score, LocalDateTime event_time, Address address) {
      this.name = name;
      this.score = score;
      this.event_time = event_time;
      this.address = address;
    }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<User> dataStream =
        env.fromElements(
                new User("Alice", 4, LocalDateTime.now(), new Address()),
                new User("Bob", 6, LocalDateTime.now(), new Address("NBC", "204")),
                new User("Alice", 10, LocalDateTime.now(), new Address("ABC", "1033")))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(60)));

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    Table table =
        tableEnv.fromDataStream(
            dataStream, Schema.newBuilder().build());

    table.printSchema();

    Table t = table.select($("*"));

    DataStream<User> dsRow = tableEnv.toDataStream(t,User.class);
    dsRow.print();

    env.execute();
  }
}

Error I got is :

Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'event_time' at position 2.

Query schema: [name: STRING, score: INT, event_time: RAW('java.time.LocalDateTime', '...'), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
Sink schema:  [name: STRING, score: INT, event_time: TIMESTAMP(9), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:437)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:256)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:198)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:143)

I tried with custom conversion from DataStream to table as well but while converting from table to DataStream still run into error. I am stuck so any help is appreciated.

voidMainReturn
  • 3,339
  • 6
  • 38
  • 66

2 Answers2

1

The automatic, reflection-based type extraction in DataStream is not as powerful as the one of the Table API. This is also due to state backwards compatibility issues in the DataStream API.

The event_time field is a GenericType in DataStream API which results in RAW in Table API. You have the following possibilities:

  • Give a proper TypeInformation in fromElements
  • Override the TypeInformation using DataType in fromDataStream
twalthr
  • 2,584
  • 16
  • 15
0

It solved my issue by registering a POJO using the below method

env.getConfig().registerPojoType(YourClass.class);

You can use any user-defined DTO and register as a POJO