4

I'm trying to process Json strings from Kinesis. The Json strings can have a couple of different forms. From Kinesis, I create a DStream:

val kinesisStream = KinesisUtils.createStream(
 ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
 "region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

 val lines = kinesisStream.map(x => new String(x))

 lines.foreachRDD((rdd, time) =>{

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
   import sqlContext.implicits.StringToColumn

   if(rdd.count() > 0){
    // Process jsons here
    // Json strings here would have either one of the formats below
   }
 })

The RDD strings would have either one of these json strings. Collection:

[
  {
    "data": {
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30024,
      "TargetId": "4138",
      "Timestamp": 0
    },
    "host": "host1"
  },
  {
    "data": {
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30025,
      "TargetId": "4139",
      "Timestamp": 0
    },
    "host": "host1"
  }
]

and some Json strings are single object like so:

{
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30026,
      "TargetId": "4140",
      "Timestamp": 0
}

I want to be able to extract the object from the "data" key if it is the first type of Json string and combine with the second type of Json and form a RDD/DataFrame, how can I achieve this?

Ultimately I would like my data frame to be something like this:

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30026|    4140|        0|
+------------------+---------+--------+---------+

Sorry, new to Scala and Spark. I've been looking at existing examples but haven't found a solution unfortunately.

Many thanks in advance.

j3tr1
  • 53
  • 2
  • 6

2 Answers2

0

You can use union after selecting the data.* columns from the first Dataframe:

val spark = SparkSession.builder().master("local[*]").getOrCreate()    
val sc = spark.sparkContext

// Assuming you store your jsons in two separate strings `json1` and `json2`
val df1 = spark.read.json(sc.parallelize(Seq(json1)))
val df2 = spark.read.json(sc.parallelize(Seq(json2)))

import spark.implicits._
df1.select($"data.*") // Select only the data columns from first Dataframe
  .union(df2)         // Union the two Dataframes as they have the same structure
  .show()

EDIT [Additional Solution Links]

After you edited your question I understand that you need some sort of fallback mechanism when parsing the JSON file. There are more ways to do this using any JSON parsing library - there's a nice solution here with Play which I think it already explains how you can solve that problem in an elegant manner.

Once you have an RDD[Data] where data is your "variant" type you can simply convert it into a Dataframe using rdd.toDF().

Hope that helps.

Andrei T.
  • 2,455
  • 1
  • 13
  • 28
  • thanks for the quick response Andrei, I appreciate it, and is quite useful! sorry, i forgot to mention that I was using Spark Streaming DStreams, I've updated my question above. – j3tr1 Jul 14 '17 at 11:59
  • I see. Is it a simple way to know which one of the object comes when? – Andrei T. Jul 14 '17 at 12:14
  • unfortunately no – j3tr1 Jul 14 '17 at 12:41
  • I've edited the answer. That should help you achieve what you're looking for, I just wanted to avoid putting all the details here since these would really be two separate questions. I hope it helps you :) – Andrei T. Jul 14 '17 at 13:05
0

This example uses json4s :

import org.json4s._
import org.json4s.jackson.JsonMethods._

implicit val format = DefaultFormats

case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )

val string1 = """
[ {
  "data" : {
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30024,
    "TargetId" : "4138",
    "Timestamp" : 0
  },
  "host" : "host1"
}, {
  "data" : {
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30025,
    "TargetId" : "4139",
    "Timestamp" : 0
  },
  "host" : "host1"
} ]

"""

val string2 = """
[ {
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4140",
  "Timestamp" : 0
}, {
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4141",
  "Timestamp" : 0
} ]
"""

val json1 = (parse(string1) \ "data").extract[List[jsonschema]]

val json2 = parse(string2).extract[List[jsonschema]]

val jsonRDD = json1.union(json2)

val df = sqlContext.createDataFrame(jsonRDD)

df.show


+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30025|    4140|        0|
|        1.0.3 (65)|    30025|    4141|        0|
+------------------+---------+--------+---------+
philantrovert
  • 9,904
  • 3
  • 37
  • 61
  • thanks for the quick response! sorry, i forgot to mention that I was using Spark Streaming DStreams, I've updated my question. You're response is still helpful though! – j3tr1 Jul 14 '17 at 11:58
  • If you are able to extract Strings from your DStream, the code should more or less work. – philantrovert Jul 14 '17 at 12:02
  • thanks! this pointed me to the right direction by using json4s. this allowed me to process the json string first before converting to DF – j3tr1 Jul 16 '17 at 03:19