0

I am trying to parse Json structure which is dynamic in nature and load into database. But facing difficulty where json has dynamic keys inside it. Below is my sample json: Have tried using explode function but didn't help. moslty similar thing is described here How to parse a dynamic JSON key in a Nested JSON result?

     {
    "_id": {
        "planId": "5f34dab0c661d8337097afb9",
        "version": {
            "$numberLong": "1"
        },
        "period": {
            "name"
            : "3Q20",
            "startDate": 20200629,
            "endDate": 20200927
        },
        "line": "b443e9c0-fafc-4791-87c9-
        8e32339c7f3c",
        "channelId": "G7k5_-HWRIuF0-afe7q-rQ"
    },
    "unitRates": {
        "units": {
            "$numberLong":
            "0"
        },
        "rate": 0.0,
        "rcRate": 0.0
    },
    "demoValues": {
        "66": {
            "cpm": 0.0,
            "cpp": 0,
            "vpvh": 0.0,
            "imps"
            :
            0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "66"
        },
        "63": {
            "cpm": 0.0,
            "cpp": 0,
            "vpvh":
            0.0,
            "imps": 0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "63"
        },
        "21": {
            "cpm": 0.0,
            "cpp"
            :
            0,
            "vpvh": 0.0,
            "imps": 0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "21"
        }
    },
    "hh-imps":
    0.0
}

Below is my scala code:

      import org.apache.spark.sql.streaming.OutputMode
      import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
      import com.google.gson.JsonObject
      import org.apache.spark.sql.types.{ArrayType, MapType, StringType, 
      StructField, StructType}
      import org.codehaus.jettison.json.JSONObject

object ParseDynamic_v2 {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\hadoop")
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[4]")
      .getOrCreate()

    import spark.implicits._
    val jsonStringDs = spark.createDataset[String](
      Seq(
        ("""{"_id" : {"planId" : "5f34dab0c661d8337097afb9","version" : {"$numberLong" : "1"},"period" : {"name" : "3Q20","startDate" : 20200629,"endDate" : 20200927},"line" : "b443e9c0-fafc-4791-87c9-8e32339c7f3c","channelId" : "G7k5_-HWRIuF0-afe7q-rQ"},"unitRates" : {"units" : {"$numberLong" : "0"},"rate" : 0.0,"rcRate" : 0.0},"demoValues" : {"66" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "66"},"63" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "63"},"21" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "21"}},"hh-imps" : 0.0}""")

      ))

    jsonStringDs.show
    import spark.implicits._
    val df = spark.read.json(jsonStringDs)
    df.show(false)


    val app = df.select("demoValues.*")
    app.createOrReplaceTempView("app")
    app.printSchema
    app.show(false)


    val verticaProperties: Map[String, String] = Map(
      "db" -> "dbname", // Database name
     "user" -> "user", // Database username
     "password" -> "****", // Password
     "table" -> "tablename", // vertica table name
     "dbschema" -> "public", // schema of vertica where the table will be 
     residing
     "host" -> "localhost", // Host on which vertica is currently running
     "hdfs_url" -> "hdfs://localhost:8020/user/hadoop/planheader/", // HDFS directory url in which intermediate orc file will persist before sending it to vertica
     "web_hdfs_url" -> "webhdfs://localhost:50070/user/hadoop/planheader/"
    )

    val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
    //read mode
    val loadStream = df.write.format(verticaDataSource).options(verticaProperties).mode("overwrite").save()

    //read stream mode

    val saveToVertica: DataFrame => Unit =
      dataFrame =>
        dataFrame.write.format(verticaDataSource).options(verticaProperties).mode("append").save()

    val checkpointLocation = "/user/hadoop/planheader/checkpoint"
    val streamingQuery = df.writeStream
      .outputMode(OutputMode.Append)
      .option("checkpointLocation", checkpointLocation)
      //.trigger(ProcessingTime("25 seconds"))
      .foreachBatch((ds, _) => saveToVertica(ds)).start()

    streamingQuery.awaitTermination()


  }

}


    

expected output: enter image description here

mt_leo
  • 67
  • 1
  • 12

2 Answers2

1

Here you see what I did using Vertica:

I created a flex table, loaded it, and used Vertica's flex table function COMPUTE_FLEXTABLE_KEYS_AND_CREATE_VIEW() to get a view.

Turned out to be a single-row table:

-- CREATE the Flex Table
CREATE FLEX TABLE demovals();

-- copy it using the built-in JSON Parser (it creates a map container,
-- with all key-value pairs
COPY demovals FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser();
-- out vsql:/home/gessnerm/._vfv.sql:1: ROLLBACK 4213:  Object "demovals" already exists
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 112.540 ms. All rows formatted: 112.623 ms
-- the function on the next line guesses the data types in the values
-- matching the keys, stores the guessed data types in a second table,
-- and builds a view from all found keys
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('demovals');
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.demovals_keys for updated keys
-- out The view dbadmin.demovals_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 467.551 ms. All rows formatted: 467.583 ms
-- now, select from the single-row view on the flex table, 
-- one row per column in the report (extended view: "\x" )
\x
SELECT * FROM dbadmin.demovals_view;
-- out -[ RECORD 1 ]---------------+-------------------------------------
-- out _id.channelid               | G7k5_-HWRIuF0-afe7q-rQ
-- out _id.line                    | b443e9c0-fafc-4791-87c9-8e32339c7f3c
-- out _id.period.enddate          | 20200927
-- out _id.period.name             | 3Q20
-- out _id.period.startdate        | 20200629
-- out _id.planid                  | 5f34dab0c661d8337097afb9
-- out _id.version.$numberlong     | 1
-- out demovalues.21.cpm           | 0.00
-- out demovalues.21.cpp           | 0
-- out demovalues.21.demoid        | 21
-- out demovalues.21.grps          | 0.00
-- out demovalues.21.imps          | 0.00
-- out demovalues.21.rcimps        | 0.00
-- out demovalues.21.ue            | 0.00
-- out demovalues.21.vpvh          | 0.00
-- out demovalues.63.cpm           | 0.00
-- out demovalues.63.cpp           | 0
-- out demovalues.63.demoid        | 63
-- out demovalues.63.grps          | 0.00
-- out demovalues.63.imps          | 0.00
-- out demovalues.63.rcimps        | 0.00
-- out demovalues.63.ue            | 0.00
-- out demovalues.63.vpvh          | 0.00
-- out demovalues.66.cpm           | 0.00
-- out demovalues.66.cpp           | 0
-- out demovalues.66.demoid        | 66
-- out demovalues.66.grps          | 0.00
-- out demovalues.66.imps          | 0.00
-- out demovalues.66.rcimps        | 0.00
-- out demovalues.66.ue            | 0.00
-- out demovalues.66.vpvh          | 0.00
-- out hh-imps                     | 0.00
-- out unitrates.rate              | 0.00
-- out unitrates.rcrate            | 0.00
-- out unitrates.units.$numberlong | 0

For the children, for example:

CREATE FLEX TABLE children();
TRUNCATE TABLE children;
COPY children FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser(start_point='demoValues');
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('children');
\x
SELECT * FROM dbadmin.children_view;
-- out Time: First fetch (0 rows): 7.303 ms. All rows formatted: 7.308 ms
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 13.848 ms. All rows formatted: 13.876 ms
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.children_keys for updated keys
-- out The view dbadmin.children_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 140.381 ms. All rows formatted: 140.404 ms
-- out -[ RECORD 1 ]---
-- out 21.cpm    | 0.00
-- out 21.cpp    | 0
-- out 21.demoid | 21
-- out 21.grps   | 0.00
-- out 21.imps   | 0.00
-- out 21.rcimps | 0.00
-- out 21.ue     | 0.00
-- out 21.vpvh   | 0.00
-- out 63.cpm    | 0.00
-- out 63.cpp    | 0
-- out 63.demoid | 63
-- out 63.grps   | 0.00
-- out 63.imps   | 0.00
-- out 63.rcimps | 0.00
-- out 63.ue     | 0.00
-- out 63.vpvh   | 0.00
-- out 66.cpm    | 0.00
-- out 66.cpp    | 0
-- out 66.demoid | 66
-- out 66.grps   | 0.00
-- out 66.imps   | 0.00
-- out 66.rcimps | 0.00
-- out 66.ue     | 0.00
-- out 66.vpvh   | 0.00

marcothesane
  • 6,192
  • 1
  • 11
  • 21
  • I was thinking if we can use spark power to flatten the json and load the data into DB so that I can write simple queries at reporting layer, else If I load the data into flex table have to write complex logic at DB layer to retrieve desired data. by using MAPJSONEXTRACTOR, mapvalues and other different functions which are very resource intensive. – mt_leo Aug 13 '20 at 15:07
0

Not sure how much efficient my code is but it does the job.

     //reading data from json file
     val df1 = spark.read.json("src/main/resources/data.json")
     // defining schema here.
        val schema = StructType(
        StructField("planid", StringType, true) ::
        StructField("periodname", IntegerType, false) ::
        StructField("cpm", StringType, false)::
        StructField("vpvh", StringType, false)::
        StructField("imps", StringType, false)::
        StructField("demoids", StringType, false)
        :: Nil)

       var someDF = spark.createDataFrame(spark.sparkContext
       .emptyRDD[Row], schema)
        val app = df1.select("demoValues.*","_id.planId","_id.period.name")
      //this will have all the dynamic keys as column
        val arr=app.columns
        for(i <- 0 to arr.length-3) {

        println("columnname: "+arr(i))
      // traversing through keys to get the values .ex: demoValues.63.cpm
      val cpm = "demoValues."+arr(i)+".cpm"
      val vpvh = "demoValues."+arr(i)+".vpvh"
      val imps="demoValues."+arr(i)+".imps"
       val df2 = df1.select(col("_id.planId"),col("_id.period.name"),col(cpm),
        col(vpvh),col(imps),lit(arr(i)).as("demoids"))
        df2.show(false)
      someDF=someDF.union(df2)

      
     }
       someDF.show()
mt_leo
  • 67
  • 1
  • 12