1

I'm working in transform a JSON into a Data Frame. In the first step I create an Array of Data Frame and after that I make an Union. But I've a problem to do a Union in a JSON with Different Schemas.

I Can do it if the JSON have the same Schema like you can see in this other question: Parse JSON root in a column using Spark-Scala

I'm working with the following data:

val exampleJsonDifferentSchema = spark.createDataset(

      """
      {"ITEM1512":
            {"name":"Yin",
             "address":{"city":"Columbus",
                        "state":"Ohio"},
             "age":28           }, 
        "ITEM1518":
            {"name":"Yang",
             "address":{"city":"Working",
                        "state":"Marc"}
                        },
        "ITEM1458":
            {"name":"Yossup",
             "address":{"city":"Macoss",
                        "state":"Microsoft"},
            "age":28
                        }
      }""" :: Nil)

As you see the difference is that one Data Frame doesn't have Age.

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema

+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+

root
 |-- ITEM1458: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

My solution now is as the follow code where i make an array of DataFrame:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))

  arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

But I've a JSON with Different Schemas when I reduce in a union I can't do it because the Data Frame need to have the same Schema. In fact, I've the following error:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types.

I'm trying to do something similar I've found in this question: How to perform union on two DataFrames with different amounts of columns in spark?

Specifically that part:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

But I cant make the set for the columns because I need to catch dynamically the columns both totals and singles. I only can do something like that:

for(i <- 0 until arrayOfExampleDFs.length-1) {

    val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
    val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
    val total = cols1 ++ cols2

    arrayOfExampleDFs(i).select("Value").printSchema()

    print(total)
}

So, how could be a function that do this union dynamically?

Update: expected output

In this Case This Data Frame and Schema:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+

root
 |-- Item: string (nullable = false)
 |-- Value: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
Jino Michel Aque
  • 513
  • 1
  • 4
  • 16
  • 3
    If you know all possible columns you want to consider, you can simply create a list with those and use it. Then you won't need to compute a new set in each iteration. – Shaido May 12 '20 at 01:28
  • You may also read the JSON with a predefined schema with all expected columns and some columns can be null too. This works if you know the union of possible schema beforehand. – Sudev Ambadi May 12 '20 at 05:03
  • But Before read as a JSON I need to do a transformation because the Items (ITEM1458, ITEM1512, ITEM1518, etcetera) appears as a columns and I need to make this columns values. What is the problem I can solve here (for a json with a same schema): https://stackoverflow.com/questions/61669258/parse-json-root-in-a-column-using-spark-scala/61670966#61670966 – Jino Michel Aque May 12 '20 at 05:09
  • did you solve this one @jqc? – abiratsis May 23 '20 at 17:59
  • No, I can't solve it. @AlexandrosBiratsis – Jino Michel Aque May 23 '20 at 18:24
  • @jqc let me know if the below approach works – abiratsis May 25 '20 at 08:42
  • @jqc hopefully it is clear why the below solution is more suitable for your case than the example you described using sets. You don't need any sets here since you have only one column to control – abiratsis May 26 '20 at 08:18

1 Answers1

1

Here is one possible solution which creates a common schema for all the dataframes by adding the age column when it is not found:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

....

for(col_name <- columns){
  val currentDf = itemsExampleDiff.select(col(col_name))

  // try to identify if age field is present
  val hasAge = currentDf.schema.fields(0)
                        .dataType
                        .asInstanceOf[StructType]
                        .fields
                        .contains(StructField("age", LongType, true))

  val valueCol = hasAge match {
    // if not construct a new value column
    case false => struct(
                    col(s"${col_name}.address"), 
                    lit(null).cast("bigint").as("age"),
                    col(s"${col_name}.name")
                  )

    case true => col(col_name)
  }

  arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

Analysis: probably the most demanding part is finding out whether the age is present or not. For the look up we use df.schema.fields property which allow us to dig into the internal schema of each column.

When age is not found we regenerate the column by using a struct:

struct(
   col(s"${col_name}.address"), 
   lit(null).cast("bigint").as("age"),
   col(s"${col_name}.name")
)
abiratsis
  • 7,051
  • 3
  • 28
  • 46