I'm trying to process Json strings from Kinesis. The Json strings can have a couple of different forms. From Kinesis, I create a DStream:
val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>{
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})
The RDD strings would have either one of these json strings. Collection:
[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]
and some Json strings are single object like so:
{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}
I want to be able to extract the object from the "data" key if it is the first type of Json string and combine with the second type of Json and form a RDD/DataFrame, how can I achieve this?
Ultimately I would like my data frame to be something like this:
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+
Sorry, new to Scala and Spark. I've been looking at existing examples but haven't found a solution unfortunately.
Many thanks in advance.