3

With Kinesis analytics SQL application, we have the option to configure the input Schema. So that we can map the input data to flat structure. So I'm looking for the similar configuration with Kinesis Data Analytics Apache Flink application(Python)

To put it another way, how can we use PyFlink to handle nested JSON data (JSON with arrays; please see the sample data and needed mapping below)?

Sharing the sample data that comes into Analytics application

    "data_json": {
        "deviceID": "xxxxx",
        "sensor": [{
            "information": {
                "trigger": false
            },
            "offsetSec": 4.81,
            "data": {
                "temperature": 37.97,
                "illuminance": 0,
                "activity": 0,
                "humidity": 31.46
            }
        }],
        "MACAddress": "xxxx",
        "parentID": "xxxx",
        "type": 1
    }
}

Expected schema format (one with SQL application: need similar in PyFlink):

RecordColumns:
  - Name: "mac_address"
    SqlType: "VARCHAR(20)"
    Mapping: "data_json.MACAddress"
  - Name: "device_key"
    SqlType: "VARCHAR(20)"
    Mapping: "data_json.deviceID"
  - Name: "type"
    SqlType: "INT"
    Mapping: "data_json.type"
  - Name: "parent_id"
    SqlType: "VARCHAR(20)"
    Mapping: "data_json.parentID"
  - Name: "illuminance"
    SqlType: "INT"
    Mapping: "data_json.sensor[0:].data.illuminance"
  - Name: "activity"
    SqlType: "INT"
    Mapping: "data_json.sensor[0:].data.activity"
  - Name: "humidity"
    SqlType: "INT"
    Mapping: "data_json.sensor[0:].data.humidity"
  - Name: "temperature"
    SqlType: "INT"
    Mapping: "data_json.sensor[0:].data.temperature"
  - Name: "offset_sec"
    SqlType: "DOUBLE"
    Mapping: "data_json.sensor[0:].offsetSec"
  - Name: "trigger_info"
    SqlType: "INT"
    Mapping: "data_json.sensor[0:].information.trigger"
RecordFormat:
  RecordFormatType: "JSON"

0 Answers0