2

I'm trying to drop a column under an array under an array in DF schema: The answer here from @spektom looks like a good starting point: Dropping a nested column from Spark DataFrame But my attempt is turning non-arrays into arrays for arrays one level down and blowing up on arrays in arrays. Any help would be much appreciated - been hacking on this for a long while.

object DataFrameUtils {


  def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {

    val colToKeep =
      if (fullColName.equals(dropColName)) {
        None
      } else {
        colType match {
          case arrayType: ArrayType =>
            arrayType.elementType match {
              case elemType: StructType =>
                if (dropColName.startsWith(s"$fullColName.")) {
                  val unDropdCols: Array[Column] = elemType.fields.flatMap(f =>
                    dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                      case Some(x) => {
                        Some(x.alias(f.name))
                      }
                      case None => {
                        None
                      }
                    })

                  Some(array(struct(unDropdCols: _*)))
                } else {
                  Some(col)
                }
              case other => {
                Some(col)
              }
            }
          case colType: StructType =>
            if (dropColName.startsWith(s"$fullColName.")) {
              val unDropdCols: Array[Column] = colType.fields.flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                  case Some(x) => {
                    Some(x.alias(f.name))
                  }
                  case None => None
                })
              Some(struct(unDropdCols: _*))
            } else {
              Some(col)
            }
          case other =>
            Some(col)
        }
      }
    colToKeep
  }


  def dropColumn(df: DataFrame, colToDropName: String): DataFrame = {
    df.schema.fields
     .flatMap(f => {
       if (colToDropName.startsWith(s"${f.name}.")) {
         dropSubColumn(col(f.name), f.dataType, f.name, colToDropName) match {
           case Some(column) => Some((f.name, column))
           case None => None
         }
       } else {
         None
       }
     }).foldLeft(df.drop(colToDropName)) {
      case (newDf, (colName, column)) => {
        newDf.withColumn(colName, column)
      }
    }
  }
}

Here's sample input:

val jsRdd = sc.parallelize(
  """{"type":"president",
   |"vals":{
   |"parents":[
   |{"name":"John Adams","salary":25000, "id":2, "children":[{"name":"John Q Adams", "id":6, "salary":25000}]},
   |{"name":"George Bush", "id":41,"salary": 200000, "children":[{"name":"George W Bush", "id":43, "salary":40000},{"name":"Jeb Bush", "id":-1}]}]
   |,"metadata":{"country":"US"}}
   |}""".stripMargin:: Nil)
val jsDf = sqlContext.read.json(jsRdd)

But running this

val newJsdf = dropColumn(jsDf, "vals.parents.name")
newJsdf.printSchema

Yields this (note that vals.parents.id is now an array):

root
 |-- type: string (nullable = true)
 |-- vals: struct (nullable = false)
 |    |-- metadata: struct (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- medium: string (nullable = true)
 |    |-- parents: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- children: array (nullable = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |    |-- salary: long (nullable = true)
 |    |    |    |-- id: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- salary: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)

And val newJsdf = dropColumn(jsDf, "vals.parents.child.name") blows up.

FYI - my situation: We have a few months of data stored in a Parquet table, but one of the nested fields recently went from an Int to a Double. So I want to re-write the older partitions so that all the data can be read by a single parquet schema (Parquet will choke on Int if the table's metadata says the column is Double). I'm fine with either dropping this column from the old partitions or casting it to DOuble - dropping seemed easier.

Community
  • 1
  • 1
alexP_Keaton
  • 349
  • 5
  • 18
  • 1
    Looks like a bug. Quick workaround could have been to replace ```Some(x.alias(f.name))``` with ```Some(x.getItem(0).alias(f.name))``` but that runs into a different bug as soon as you have double nested arrays – Niek Bartholomeus Mar 03 '17 at 17:22
  • Although it has been asked long time ago, is there any solution that can be updated? I came up with the same case as specified above.. – ylcnky Oct 13 '21 at 12:53

0 Answers0