0

Within the JSON objects I am attempting to process, I am being given a nested StructType where each key represents a specific location, which then contains a currency and price:

-- id: string (nullable = true)
-- pricingByCountry: struct (nullable = true)
   |-- regionPrices: struct (nullable = true)
   |-- AT: struct (nullable = true)
       |-- currency: string (nullable = true)
       |-- price: double (nullable = true)
   |-- BT: struct (nullable = true)
       |-- currency: string (nullable = true)
       |-- price: double (nullable = true)
   |-- CL: struct (nullable = true)
       |-- currency: string (nullable = true)
       |-- price: double (nullable = true)
...etc.

and I'd like to explode it so that rather than having a column per country, I can have a row for each country:

+---+--------+---------+------+
| id| country| currency| price|
+---+--------+---------+------+
|  0|      AT|      EUR|   100|
|  0|      BT|      NGU|   400|
|  0|      CL|      PES|   200|
+---+--------+---------+------+

These solution make sense intuitively: Spark DataFrame exploding a map with the key as a member and Spark scala - Nested StructType conversion to Map, but unfortunately don't work because I'm passing in a column and not a whole row to be mapped. I don't want to manually map the whole row--just a specific column that contains nested structs. There are several other attributes at the same level as "id" that I'd like to maintain in the structure.

Noelle Caldwell
  • 163
  • 1
  • 7
  • you can try to first flatten the struct flatten the struct (https://stackoverflow.com/questions/38753898/how-to-flatten-a-struct-in-a-spark-dataframe) and then do a unpivot/stack (https://stackoverflow.com/questions/42465568/unpivot-in-spark-sql-pyspark) – Paul Aug 12 '19 at 19:56

1 Answers1

0

I think it can done as below:

// JSON test data
val ds = Seq("""{"id":"abcd","pricingByCountry":{"regionPrices":{"AT":{"currency":"EUR","price":100.00},"BT":{"currency":"NGE","price":200.00},"CL":{"currency":"PES","price":300.00}}}}""").toDS

val df = spark.read.json(ds)

// Schema to map udf output
val outputSchema = ArrayType(StructType(Seq(
  StructField("country", StringType, false),
  StructField("currency", StringType, false),
  StructField("price", DoubleType, false)
)))

// UDF takes value of `regionPrices` json string and converts
// it to Array of tuple(country, currency, price)
import org.apache.spark.sql._
val toMap = udf((jsonString: String) => {
  import com.fasterxml.jackson.databind._
  import com.fasterxml.jackson.module.scala.DefaultScalaModule

  val jsonMapper = new ObjectMapper()
  jsonMapper.registerModule(DefaultScalaModule)

  val jsonMap = jsonMapper.readValue(jsonString, classOf[Map[String, Map[String, Double]]])
  jsonMap.map(f => (f._1, f._2("currency"), f._2("price"))).toSeq

}, outputSchema)

val result = df.
              select(col("id").as("id"), explode(toMap(to_json(col("pricingByCountry.regionPrices")))).as("temp")).
              select(col("id"), col("temp.country").as("country"), col("temp.currency").as("currency"), col("temp.price").as("price"))

Output will be:

scala> result.show
+----+-------+--------+-----+
|  id|country|currency|price|
+----+-------+--------+-----+
|abcd|     AT|     EUR|100.0|
|abcd|     BT|     NGE|200.0|
|abcd|     CL|     PES|300.0|
+----+-------+--------+-----+
kode
  • 384
  • 1
  • 2