-1

I have a csv file like this:

weight,animal_type,animal_interpretation
20,dog,"{is_large_animal=true, is_mammal=true}"
3.5,cat,"{is_large_animal=false, is_mammal=true}"
6.00E-04,ant,"{is_large_animal=false, is_mammal=false}"

And I created case class schema with the following:

package types

case class AnimalsType (
                         weight: Option[Double],
                         animal_type: Option[String],
                         animal_interpretation: Option[AnimalInterpretation]
                       )

case class AnimalInterpretation (
                                  is_large_animal: Option[Boolean],
                                  is_mammal: Option[Boolean]
                                )

I tried to load the csv into a dataframe with:

var df = spark.read.format("csv").option("header", "true").load("src/main/resources/animals.csv").as[AnimalsType]

But got the following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Can't extract value from animal_interpretation#12: need struct type but got string;

Am I doing something wrong? What would be the proper way of doing this?

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19
Han Xu
  • 185
  • 10
  • Does this answer your question? [How to query JSON data column using Spark DataFrames?](https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes) – user10938362 Feb 11 '20 at 00:03
  • No.. Why downvote this question? Is it not clear? – Han Xu Feb 11 '20 at 00:07

1 Answers1

1

You can not assigned schema to csv json directly. You need to do transform csv String column (animal_interpretation) into Json format, As I have done in below code using UDF. if you can get input data in format like df1 then there is no need of below UDF you can continue from df1 and get final dataframe df2.

There is no need of any case class since your data header contain column and for json data you need to declare schema AnimalInterpretationSch as below

scala> import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.expressions.UserDefinedFunction

//Input CSV DataFrame

scala> df.show(false)
+--------+-----------+---------------------------------------+
|weight  |animal_type|animal_interpretation                  |
+--------+-----------+---------------------------------------+
|20      |dog        |{is_large_animal=true, is_mammal=true} |
|3.5     |cat        |{is_large_animal=false, is_mammal=true}|
|6.00E-04|ant        |{is_large_animal=false,is_mammal=false}|
+--------+-----------+---------------------------------------+

//UDF to convert "animal_interpretation" column to Json Format

scala> def StringToJson:UserDefinedFunction = udf((data:String,JsonColumn:String) => {
     | var out = data
     | val JsonColList = JsonColumn.trim.split(",").toList
     | JsonColList.foreach{ rr => 
     | out = out.replaceAll(rr, "'"+rr+"'")
     | }
     | out = out.replaceAll("=", ":")
     | out
     | })

//All column from Json 

scala> val JsonCol = "is_large_animal,is_mammal"

//New dataframe with Json format

scala> val df1 = df.withColumn("animal_interpretation", StringToJson(col("animal_interpretation"), lit(JsonCol)))

scala> df1.show(false)
+--------+-----------+-------------------------------------------+
|weight  |animal_type|animal_interpretation                      |
+--------+-----------+-------------------------------------------+
|20      |dog        |{'is_large_animal':true, 'is_mammal':true} |
|3.5     |cat        |{'is_large_animal':false, 'is_mammal':true}|
|6.00E-04|ant        |{'is_large_animal':false,'is_mammal':false}|
+--------+-----------+-------------------------------------------+

//Schema declarion of Json format

scala> val AnimalInterpretationSch = new StructType().add("is_large_animal", BooleanType).add("is_mammal", BooleanType)

//Accessing Json columns 

scala> val df2 = df1.select(col("weight"), col("animal_type"),from_json(col("animal_interpretation"), AnimalInterpretationSch).as("jsondata")).select("weight", "animal_type", "jsondata.*")

scala> df2.printSchema
root
 |-- weight: string (nullable = true)
 |-- animal_type: string (nullable = true)
 |-- is_large_animal: boolean (nullable = true)
 |-- is_mammal: boolean (nullable = true)


scala> df2.show()
+--------+-----------+---------------+---------+
|  weight|animal_type|is_large_animal|is_mammal|
+--------+-----------+---------------+---------+
|      20|        dog|           true|     true|
|     3.5|        cat|          false|     true|
|6.00E-04|        ant|          false|    false|
+--------+-----------+---------------+---------+
Nikhil Suthar
  • 2,289
  • 1
  • 6
  • 24