0

Below statement caters my need but it runs only in spark-shell but not in scala program.

spark.read.json(dataframe.select("col_name").as[String]).schema

I converted dataframe to rdd and passed, it worked fine (I followed the below link), but it works only if

I have only json column value, the moment I pass other cols (direct col values), it fails to provide

output.

How to parse json column in dataframe in scala

I have solution that works at spark-shell but not in scala program.

input table

output required

errorau
  • 2,121
  • 1
  • 14
  • 38
AnumNuma
  • 31
  • 3

1 Answers1

0

You can use the from_json method to transform your json column to a structtype column. You can then break this column into different columns as is your case. But, you have to keep in mind that the json should have a uniform format otherwise the result may not be as desired. You can refer to the following code:

val df = spark.createDataFrame(Seq(
  ("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
  ("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
  ("E", "F", "{\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"}")
)).toDF("col_1", "col_2", "col_json")

The input dataframe is as follows:

scala> df.show(false)
+-----+-----+---------------------------------------------+
|col_1|col_2|col_json                                     |
+-----+-----+---------------------------------------------+
|A    |B    |{"Name":"xyz","Address":"NYC","title":"engg"}|
|C    |D    |{"Name":"mnp","Address":"MIC","title":"data"}|
|E    |F    |{"Name":"pqr","Address":"MNN","title":"bi"}  |
+-----+-----+---------------------------------------------+

Now, we will find out the schema col_schema for the json column so that it can be applied to the col_json column

val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema

val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")

The result will be as follows:

scala> outputDF.show(false)
+-----+-----+-------+----+-----+
|col_1|col_2|Address|Name|title|
+-----+-----+-------+----+-----+
|A    |B    |NYC    |xyz |engg |
|C    |D    |MIC    |mnp |data |
|E    |F    |MNN    |pqr |bi   |
+-----+-----+-------+----+-----+

The scala code that worked for me is:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import scala.collection.Seq

object Sample{

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").getOrCreate()

    val df = spark.createDataFrame(Seq(
      ("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
      ("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
      ("E", "F", "{\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"}")
    )).toDF("col_1", "col_2", "col_json")

    import spark.implicits._

    val col_schema = spark.read.json(df.select("col_json").as[String]).schema

    val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")

    outputDF.show(false)
  }
}
Siddharth Goel
  • 356
  • 2
  • 10
  • val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema ---- this is working fine in spark-shell (which I have already tested) but when write this line into scala program it says it cannot access dataframes as an argument for spark.read.json. So I need to convert it json col as an RDD. so question is there a way or option to write it in spark-scala? – AnumNuma Mar 12 '20 at 13:34
  • Actually, I'm passing the DataSet[String] in the spark.read.json method. This is only available after spark version `2.2.0`. Also, you need to use the following statement after you create your spark context in the scala code. `import spark.implicits._`. I'm writing the scala code in the answer above, just check the spark version and everything will work. – Siddharth Goel Mar 13 '20 at 09:08
  • Sir, you rock!!! thanks, I got to know the error I made, my POM spark version was 2.1 so was getting error, I pointed my POM spark version to 2.3.0 that's our current version and it worked fine. thanks again. – AnumNuma Mar 15 '20 at 18:45