0

I was in the process of flattening a Spark Schema using the method suggested here, when I came across an edge case -

val writerSchema = StructType(Seq(
      StructField("f1", ArrayType(ArrayType(
        StructType(Seq(
          StructField("f2", ArrayType(LongType))
        ))
      )))
    ))

writerSchema.printTreeString()

root
 |-- f1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- f2: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)

This prints the following output - f1 and not

f1
f1.f2

as I expected it to be.

Questions -

  1. Is writerSchema a valid Spark schema?
  2. How do I handle ArrayType objects when flattening the schema?
AngryPanda
  • 1,261
  • 2
  • 19
  • 42

1 Answers1

0

If you want to handle data like this

val json = """{"f1": [{"f2": [1, 2, 3] }, {"f2": [4,5,6]}, {"f2": [7,8,9]}, {"f2": [10,11,12]}]}"""

The valid schema will be

val writerSchema = StructType(Seq(
    StructField("f1", ArrayType(
      StructType(Seq(
        StructField("f2", ArrayType(LongType))
      ))
    ))))

root
 |-- f1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- f2: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)

You shouldn't be putting an ArrayType inside another ArrayType.

So lets suppose you have a dataframe inputDF :

inputDF.printSchema
root
 |-- f1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- f2: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)

inputDF.show(false)
+-------------------------------------------------------------------------------------------------------+
|f1                                                                                                     |
+-------------------------------------------------------------------------------------------------------+
|[[WrappedArray(1, 2, 3)], [WrappedArray(4, 5, 6)], [WrappedArray(7, 8, 9)], [WrappedArray(10, 11, 12)]]|
+-------------------------------------------------------------------------------------------------------+

To flatten this dataframe we can explode the array columns (f1 and f2):

First, flatten column 'f1'

val semiFlattenDF = inputDF.select(explode(col("f1"))).select(col("col.*"))

semiFlattenDF.printSchema
root
 |-- f2: array (nullable = true)
 |    |-- element: long (containsNull = true)

semiFlattenDF.show
+------------+
|          f2|
+------------+
|   [1, 2, 3]|
|   [4, 5, 6]|
|   [7, 8, 9]|
|[10, 11, 12]|
+------------+

Now flatten column 'f2' and get the column name as 'value'

val fullyFlattenDF = semiFlattenDF.select(explode(col("f2")).as("value"))

So now the DataFrame is flattened:

fullyFlattenDF.printSchema
root
 |-- value: long (nullable = true)

fullyFlattenDF.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
+-----+
anuj saxena
  • 279
  • 1
  • 7