15

Due to the fact that parquet cannt parsists empty arrays, I replaced empty arrays with null before writing a table. Now as I read the table, I want to do the opposite:

I have a DataFrame with the following schema :

|-- id: long (nullable = false)
 |-- arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)

and the following content:

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|       null|
+---+-----------+

I'd like to replace the null-array (id=2) with an empty array, i.e.

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|         []|
+---+-----------+

I've tried:

val arrSchema = df.schema(1).dataType

df
.withColumn("arr",when($"arr".isNull,array().cast(arrSchema)).otherwise($"arr"))
.show()

which gives :

java.lang.ClassCastException: org.apache.spark.sql.types.NullType$ cannot be cast to org.apache.spark.sql.types.StructType

Edit : I don't want to "hardcode" any schema of my array column (at least not the schema of the struct) because this can vary from case to case. I can only use the schema information from df at runtime

I'm using Spark 2.1 by the way, therefore I cannot use typedLit

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145

4 Answers4

14
  • Spark 2.2+ with known external type

    In general you can use typedLit to provide empty arrays.

    import org.apache.spark.sql.functions.typedLit
    
    typedLit(Seq.empty[(Double, Double)])
    

    To use specific names for nested objects you can use case classes:

    case class Item(x: Double, y: Double)
    
    typedLit(Seq.empty[Item])
    

    or rename by cast:

    typedLit(Seq.empty[(Double, Double)])
      .cast("array<struct<x: Double, y: Double>>")
    
  • Spark 2.1+ with schema only

    With schema only you can try:

    val schema = StructType(Seq(
      StructField("arr", StructType(Seq(
        StructField("x", DoubleType),
        StructField("y", DoubleType)
      )))
    ))
    
    def arrayOfSchema(schema: StructType) =
      from_json(lit("""{"arr": []}"""), schema)("arr")
    
    arrayOfSchema(schema).alias("arr")
    

    where schema can be extracted from the existing DataFrame and wrapped with additional StructType:

    StructType(Seq(
      StructField("arr", df.schema("arr").dataType)
    ))
    
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
2

One way is the use a UDF :

val arrSchema = df.schema(1).dataType // ArrayType(StructType(StructField(x,DoubleType,true), StructField(y,DoubleType,true)),true)

val emptyArr = udf(() => Seq.empty[Any],arrSchema)

df
.withColumn("arr",when($"arr".isNull,emptyArr()).otherwise($"arr"))
.show()

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|         []|
+---+-----------+
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
1

Another approach would be to use coalesce:

val df = Seq(
  (Some(1), Some(Array((1.0, 2.0)))),
  (Some(2), None)
).toDF("id", "arr")

df.withColumn("arr", coalesce($"arr", typedLit(Array.empty[(Double, Double)]))).
  show
// +---+-----------+
// | id|        arr|
// +---+-----------+
// |  1|[[1.0,2.0]]|
// |  2|         []|
// +---+-----------+
Leo C
  • 22,006
  • 3
  • 26
  • 39
0

UDF with case class could also be interesting:

case class Item(x: Double, y: Double)
val udf_emptyArr = udf(() => Seq[Item]())
df
.withColumn("arr",coalesce($"arr",udf_emptyArr()))
.show()
Zach
  • 11
  • 3