I'm trying to extract a few nested fields in PyFlink from JSON data received from Kafka. The JSON record schema is as follows. Basically, each record has a Result
object within which there's an array of objects called data
. I'm trying to extract the value
field from the first array element i.e. data[0]
.
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
I'm using the Table API to read from a Kafka topic and write the extracted fields to another topic.
The source DDL is as follows:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
The corresponding sink DDL is:
sink_ddl = """
CREATE TABLE OutTable (
`ID` STRING,
`value` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'out-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
Here's the code snippet for extracting the value
field from first element of the array:
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
table = t_env.from_path('InTable')
table \
.select(
table.ID,
table.Result.data.at(1).value) \
.execute_insert('OutTable') \
.wait()
I see the following error in the execute_insert
step, when I execute this.
py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)
However, if I don't extract the embedded value
but rather the entire row of the array i.e. table.Result.data.at(1)
and modify the sink_ddl
appropriately, I'm able to get the entire row properly.
Any idea, what am I missing? Thanks for any pointers!
Edit: This is probably a bug in Flink, and it being tracked by https://issues.apache.org/jira/browse/FLINK-22082.