2

I'm reading from a kafka stream, creating a Table environment and calculating an average and writing the data back to kafka [SIMPLECUSTOMER].

This worked in Flink 1.12.5. I'm using Flink 1.13.2 and Flink 1.14.0

customerId is read as RAW('org.apache.avro.util.Utf8', '...') as defined in Avro Generated Java class. While writing back to the sink, I'm getting the below error.

org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.SIMPLECUSTOMER' do not match.

Cause: Incompatible types for sink column 'customerId' at position 0.

Query schema: [customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] Sink schema: [customerId: STRING, age: INT]

Sink Table Schema:

    TableResult sinkTable =
    tableEnv.executeSql(
        "CREATE TABLE SIMPLECUSTOMER (\n"
            + "  `customerId` STRING, \n"
            + "  `age` INT NOT NULL,\n"
            + "   PRIMARY KEY (customerId) NOT ENFORCED\n"
            + ") WITH (\n"
            + "  'connector' = 'upsert-kafka',\n"
            + "  'topic' = 'simple-customer',\n"
            + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
            + "  'properties.group.id' = 'testGroup',\n"
            + "  'value.format' = 'avro',\n"
            + "  'key.format' = 'raw')");

Here is my sink code

TableResult table3 =
        tableEnv.executeSql(
            "insert into SIMPLECUSTOMER  select customerId, avg(age) as age from customer group by customerId ");

Tried Casting it to string, that didn't work either

 TableResult table3 =
    tableEnv.executeSql(
        "insert into SIMPLECUSTOMER  select CAST(customerId as STRING), avg(age) as age from customer group by customerId ");

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RAW('org.apache.avro.util.Utf8', '...') to type VARCHAR(2147483647) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)

  • If you are able to change the Avro schema slightly, this could be an option to make the handling in Flink easier: https://stackoverflow.com/questions/49974241/avro-schema-what-is-avro-java-string-string – twalthr Oct 09 '21 at 20:03
  • Try to set the property `value.format'` to `'avro-confluent'` instead of `'avro'`. Then you also need to provide `'value.avro-confluent.schema-registry.url'`. – peterschrott Oct 18 '21 at 14:29

0 Answers0