I have a JSON which I am reading from a kafka topic using spark streaming
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N", "LAST_UPDATE_DATE": "05:31.7"}
I understand that we would first need to create a schema which I have done here and parse the input json which we got from Kafka i.e. value field via the from_json function.
schema = StructType([
StructField("COUNTRY_REGION",StringType(),True),
StructField("PROVINCE_STATE",StringType(),True),
StructField("ISO_3166_1",StringType(),True),
StructField("ISO_3166_2", StringType(), True),
StructField("DATE", DateType(), True),
StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
StructField("PARKS_CHANGE_PERC", IntegerType(), True),
StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True),
StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),
StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),
StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True),
StructField("LAST_UPDATE_DATE", DateType(), True),
StructField("LAST_REPORTED_FLAG", BooleanType(), True),
StructField("SUB_REGION_2", StringType(), True),
])
json_edit = df.select (from_json("value",schema).alias("json"))
However, I realise the GROCERY_AND_PHARMACY_CHANGE_PERC
, PARKS_CHANGE_PERC
and LAST_UPDATE_DATE
become null.
display(json_edit)
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}
I realized that it's because of the original JSON, for example "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7"
when it should be "GROCERY_AND_PHARMACY_CHANGE_PERC": -7
.
Is there any way to convert the string to a double/int before I parse it into the schema?
Thank you!