1

I want to split the JSON format column results in a Spark dataframe:

allrules_internal table in Hive :

----------------------------------------------------------------
|tablename  |                 condition            | filter     |
|---------------------------------------------------------------|
| documents | {"col_list":"document_id,comments"}  | NA         |
| person    | {"per_list":"person_id, name, age"}  | NA         |
 ---------------------------------------------------------------

Code:

val allrulesDF = spark.read.table("default" + "." + "allrules_internal")
allrulesDF.show()

val df1 = allrulesDF.select(allrulesDF.col("tablename"), allrulesDF.col("condition"), allrulesDF.col("filter"), allrulesDF.col("dbname")).collect()

Here I want to split the condition column values. From the example above, I want to keep the "document_id, comments" part. In other words, the condition column have a key/value pair but I only want the value part.

If more than one row in allrules_internal table how to split the value.

  df1.foreach(row => { 
     //   condition = row.getAs("condition").toString() // here how to retrive ?
       println(condition)
       val tableConditionDF = spark.sql("SELECT "+ condition + " FROM " + db_name + "." + table_name)
       tableConditionDF.show()
 })
stack0114104
  • 87
  • 1
  • 11

1 Answers1

1

You can use the from_jsonfunction:

import org.apache.spark.sql.functions._
import spark.implicits._

allrulesDF
  .withColumn("condition", from_json($"condition", StructType(Seq(StructField("col_list", DataTypes.StringType, true)))))
  .select($"tablename", $"condition.col_list".as("condition"))

It will print:

+---------+---------------------+
|tablename|condition            |
+---------+---------------------+
|documents|document_id, comments|
+---------+---------------------+

Explanation:

With the withColumn method, you can create a new column by using a function combining one or more columns. In this case, we're using the from_json function, which receives the column that contains a JSON String, and a StructType object, with the schema of the JSON string represented in the column. Finally, you just have to select the columns you that need.

Hope it helped!

Álvaro Valencia
  • 1,187
  • 8
  • 17
  • If I have more than one row in the allrules_internal table how to add? df1.foreach(row => { condition = allrulesDF .withColumn(row.getAs("condition"), from_json($"condition", StructType(Seq(StructField("col_list", DataTypes.StringType, true))))) .select($"condition.col_list".as("condition")) // condition = row.getAs("condition").toString() println(condition) val tableConditionDF = spark.sql("SELECT "+ condition + " FROM " + db_name + "." + table_name) tableConditionDF.show() }) – stack0114104 Oct 10 '18 at 09:51
  • edited question – stack0114104 Oct 10 '18 at 11:38