I'm reading from a kafka stream, creating a Table environment and calculating an average and writing the data back to kafka [SIMPLECUSTOMER].
This worked in Flink 1.12.5. I'm using Flink 1.13.2 and Flink 1.14.0
customerId is read as RAW('org.apache.avro.util.Utf8', '...') as defined in Avro Generated Java class. While writing back to the sink, I'm getting the below error.
org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.SIMPLECUSTOMER' do not match.
Cause: Incompatible types for sink column 'customerId' at position 0.
Query schema: [customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] Sink schema: [customerId: STRING, age: INT]
Sink Table Schema:
TableResult sinkTable =
tableEnv.executeSql(
"CREATE TABLE SIMPLECUSTOMER (\n"
+ " `customerId` STRING, \n"
+ " `age` INT NOT NULL,\n"
+ " PRIMARY KEY (customerId) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'upsert-kafka',\n"
+ " 'topic' = 'simple-customer',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'properties.group.id' = 'testGroup',\n"
+ " 'value.format' = 'avro',\n"
+ " 'key.format' = 'raw')");
Here is my sink code
TableResult table3 =
tableEnv.executeSql(
"insert into SIMPLECUSTOMER select customerId, avg(age) as age from customer group by customerId ");
Tried Casting it to string, that didn't work either
TableResult table3 =
tableEnv.executeSql(
"insert into SIMPLECUSTOMER select CAST(customerId as STRING), avg(age) as age from customer group by customerId ");
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RAW('org.apache.avro.util.Utf8', '...') to type VARCHAR(2147483647) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)