5

We have the following problem while using Flink SQL: we have configured Kafka Twitter connector to add tweets to Kafka and we want to read the tweets from Kafka in a table using Flink SQL.

How can we define nested json properties (including arrays) using Flink SQL API ?

We have tried the following, but it does not work (the values returned are empty):

CREATE TABLE kafka_tweets(
  payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
  'connector' = 'kafka',
  'topic' = 'twitter_status',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In the twitter response HashtagEntities is an array of objects.

mricat
  • 51
  • 1

1 Answers1

0
CREATE TABLE `table` (
  `userid` BIGINT,
  `json_data` VARCHAR(2147483647),
  `request_id` AS JSON_VALUE(`json_data`, '$.request_id'),
  `items` ARRAY<ROW<`itemid` BIGINT, `shopid` BIGINT>>,
  `event_time` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd HH:mm:ss')),
  `version` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd')),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)
Peter Csala
  • 17,736
  • 16
  • 35
  • 75
  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Feb 15 '22 at 06:42