I am getting a stream data set from Azure event hubs. The data is coming in the following format:
[
[
{
"data": "sampledata1",
"addressdata": {
"isTest": false,
"address": "washington",
"zipcode": 98119,
"city": "seattle",
"randomstring": "abcdabcd:ghkjnkasd:asdasdasd"
},
"profession": "engineer",
"party": "democrat"
},
{
"data": "sampledata2",
"addressdata": {
"isTest": false,
"address": "virginia",
"zipcode": 20120,
"city": "Centreville",
"randomstring": "zabcdabcd:tghkjnkasd:gasdasdasd"
},
"profession": "teacher",
"party": "republican"
}
]
]
From the following article, I can get the json as raw string. https://docs.databricks.com/spark/latest/structured-streaming/streaming-event-hubs.html
But I cannot extract individual item from the string using get_jon_object. I believe the problem is that the string is not a single json object, it is an array of array of json. So, the get_json_object is not able to parse it.
val outputDf = streamingInputDf.select(
get_json_object(($"body").cast("string"), "$.data").alias("data"),
get_json_object(($"body").cast("string"), "$.addressdata").alias("addressdata"),
get_json_object(($"body").cast("string"), "$.profession").alias("profession"),
get_json_object(($"body").cast("string"), "$.party").alias("party"),
date_format($"enqueuedTime", "dd.MM.yyyy").alias("day"),
date_format($"enqueuedTime", "HH").cast("int").alias("hour") ,
when(date_format($"enqueuedTime", "mm").cast("int")<=15,1)
.when(date_format($"enqueuedTime", "mm").cast("int")>15 && date_format($"enqueuedTime", "mm").cast("int")<=30,2)
.when(date_format($"enqueuedTime", "mm").cast("int")>30 && date_format($"enqueuedTime", "mm").cast("int")<=45,3)
.otherwise(4).alias("minute")
)
Anyone have any suggestion, how to gracefully parse the data and extract individual information from the string? Any method similar to get_json_object that can extract data from array of json?
PS: I get the array of json in one line. not like mentioned above.