5

I'm trying to extract a few nested fields in PyFlink from JSON data received from Kafka. The JSON record schema is as follows. Basically, each record has a Result object within which there's an array of objects called data. I'm trying to extract the value field from the first array element i.e. data[0].

{
  'ID': 'some-id',
  'Result': {
    'data': [
      {
        'value': 65537,
        ...
        ...
      }
    ]
  }
}

I'm using the Table API to read from a Kafka topic and write the extracted fields to another topic.

The source DDL is as follows:

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'in-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""

The corresponding sink DDL is:

sink_ddl = """
    CREATE TABLE OutTable (
        `ID` STRING,
        `value` BIGINT
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'out-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""

Here's the code snippet for extracting the value field from first element of the array:

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

table = t_env.from_path('InTable')
table \
    .select(
        table.ID,
        table.Result.data.at(1).value) \
    .execute_insert('OutTable') \
    .wait()

I see the following error in the execute_insert step, when I execute this.

py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)

However, if I don't extract the embedded value but rather the entire row of the array i.e. table.Result.data.at(1) and modify the sink_ddl appropriately, I'm able to get the entire row properly.

Any idea, what am I missing? Thanks for any pointers!

Edit: This is probably a bug in Flink, and it being tracked by https://issues.apache.org/jira/browse/FLINK-22082.

sumeetkm
  • 189
  • 1
  • 7

0 Answers0