65

All,

Is there an elegant and accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType

For example

If my schema is:

foo
 |_bar
 |_baz
x
y
z

How do I select it into a flattened tabular form without resorting to manually running

df.select("foo.bar","foo.baz","x","y","z")

In other words, how do I obtain the result of the above code programmatically given just a StructType and a DataFrame

echen
  • 2,002
  • 1
  • 24
  • 38
  • Have you tried using the `explode` DataFrame method? – Daniel de Paula May 26 '16 at 22:37
  • 2
    Don't think `explode` is going to do it. `explode` creates new rows -- he wants to add columns. I think you need to work with `Column` objects. – David Griffin May 27 '16 at 02:42
  • Sorry, my mistake. – Daniel de Paula May 27 '16 at 13:55
  • I mean, I'm sure I could do it with `explode` -- `explode` actually does let you create new columns. I just don't think it would be very elegant -- you would probably have to do the schema reflection for every record, instead of front-loading the schema reflection to only do it once to create the `select(...)` – David Griffin May 27 '16 at 15:57
  • 1
    Solution directly from databricks: https://github.com/delta-io/delta/blob/f72bb4147c3555b9a0f571b35ac4d9a41590f90f/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala#L123 – Tomas Bartalos Aug 14 '19 at 08:25

14 Answers14

108

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType): Seq[Column] = schema.fields.flatMap {
  case StructField(name, inner: StructType, _, _) => allColumns(inner).map(sub => col(s"$name.$sub"))
  case StructField(name, _, _, _)                 => Seq(col(name))
}

You would then use it like this:

df.select(flattenSchema(df.schema):_*)
Joan
  • 4,079
  • 2
  • 28
  • 37
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • 1
    Thank you, this seems like a very reasonable solution. – echen May 28 '16 at 12:34
  • 8
    Using this solution, how can I handle lowest level child nodes which have identical names? For example parent element Foo has child Bar and parent element Foz also has a separate child named Bar. When selecting Foo.Bar and Foz.Bar from initial dataframe (with the Array returned by `flattenSchema`), I get 2 columns both named Bar. But I would like column headers as `Foo.Bar` or `Foo_Bar` or something like that. So every one of them would be unique and unambiguous. – V. Samma Jul 15 '16 at 12:08
  • IM having trouble with the `col(colName)` part, what import statement do I need for `col`/ – TheM00s3 Oct 20 '16 at 20:39
  • 1
    In what version of Spark is the above solution applicable? In Spark 2.1.0 (Java API) it doesn't look like StructField's type can ever be a StructType. – dmux Jan 30 '17 at 19:10
  • 3
    TheM00s3, you would import org.apache.spark.sql.functions.col And it should work in Spark 2.1.x as well (only tried this in Scala so far, not Java) – markus May 23 '17 at 15:15
  • 1
    Hi @David Griffin, How would I need to modify this function in case the Arrays include nested Dictionaries and a (different) Dictionary might nest an array? – vsdaking Jul 16 '17 at 15:39
  • Hi David I am getting "error: not found: type Column" error any idea how to resolve the same. – Ajay Sant Aug 30 '17 at 14:10
  • Try using `import org.apache.spark.sql._` – David Griffin Aug 30 '17 at 14:59
  • 18
    Just in case someone else stumbled on this: if you want the new column names to reflect the nested structure of the original schema: ```f1.nested1.nested2 ...``` you should alias the columns at this line: ```case _ => Array(col(colName))``` should become ```case _ => Array(col(colName).alias(colName))``` – b2Wc0EKKOvLPn Mar 06 '18 at 17:08
  • I added an answer that shows how to customize the column name delimiter in case you don't want to use periods. – Powers May 18 '18 at 01:51
  • @dmux That is true, but StructType extends DataType, so a StructFields *DataType* can be a StructType which is what we're pattern matching here – JMess Aug 31 '18 at 18:03
  • https://stackoverflow.com/q/55767080/7648 I'm getting an error when using this method with some (not all) data. – Paul Reiners Apr 19 '19 at 19:47
  • my version can't alias `a.b` for nested alias, used another character, eg: `_` instead – SummersKing Dec 25 '20 at 02:11
  • @SummersKing i am looking for that. I am not able to change it to _. Can you please provide that ? I changed it into else but still giving a.b not a_b – NickyPatel May 24 '21 at 07:03
44

Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()
Steve Ng
  • 1,189
  • 1
  • 13
  • 35
Evan V
  • 1,340
  • 10
  • 9
  • I am running into an error, likely due to a heavily nested JSON schema, but I am not entirely sure what it means: "cannot resolve '`item`.`productOrService`.`coding`['code']' due to data type mismatch: argument 2 requires integral type, however, ''code'' is of string type." Any ideas? I am entirely new to JSON but I would suspect the array within the struct is an issue. – user1983682 Dec 02 '19 at 20:20
  • @user1983682 Please, open your case, so we could see your `schema` with details. – Cloud Cho Mar 27 '21 at 06:22
28

I am improving my previous answer and offering a solution to my own problem stated in the comments of the accepted answer.

This accepted solution creates an array of Column objects and uses it to select these columns. In Spark, if you have a nested DataFrame, you can select the child column like this: df.select("Parent.Child") and this returns a DataFrame with the values of the child column and is named Child. But if you have identical names for attributes of different parent structures, you lose the info about the parent and may end up with identical column names and cannot access them by name anymore as they are unambiguous.

This was my problem.

I found a solution to my problem, maybe it can help someone else as well. I called the flattenSchema separately:

val flattenedSchema = flattenSchema(df.schema)

and this returned an Array of Column objects. Instead of using this in the select(), which would return a DataFrame with columns named by the child of the last level, I mapped the original column names to themselves as strings, then after selecting Parent.Child column, it renames it as Parent.Child instead of Child (I also replaced dots with underscores for my convenience):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

And then you can use the select function as shown in the original answer:

var newDf = df.select(renamedCols:_*)
V. Samma
  • 2,558
  • 8
  • 30
  • 34
  • Hi @V. Samma, This solution is great. However, what will the code be in case of child attributes with the same name and from a single parent attr. ? e.g. { "batter": [ { "id": "1001", "type": "Regular" }, { "id": "1002", "type": "Chocolate" }, { "id": "1003", "type": "Blueberry" }, { "id": "1004", "type": "Devil's Food" } ] } – vsdaking Jul 17 '17 at 16:16
  • @vsdaking Hi, thanks. I'm sure I have tackled a problem where you want data from a JSON array. Unfortunately, some time has passed and I don't have access to Spark currently to test as well. The child attributes with the same name is not a problem because they would need to be the column names for your final DF I presume. You just have to search for how to read JSON Arrays in Spark. Maybe the [`explode`](http://xinhstechblog.blogspot.com.ee/2016/05/reading-json-nested-array-in-spark.html) command will help you with that. – V. Samma Jul 17 '17 at 20:47
  • thanks for this @V.Samma I've used this for my problem, however it creates a very wide dataframe, I actually need my nested Struct Types as new Rows in my Dataframe. Any advise on this would be appreciated – ukbaz Sep 28 '17 at 08:23
  • @ukbaz Of course, it takes all nested child properties and flattens them schema-wise, which actually means that they are now a separate column for the dataframe. That was the goal of my solution. I am struggling to understand what do you need exactly. If you have columns `ID`, `Person`, `Address` but schema is like: "ID", "Person.Name", "Person.Age", "Address.City", "Address.Street", "Address.Country", then by flattening, the initial 3 columns create 6 columns. What's the result you would want based on my example? – V. Samma Sep 28 '17 at 08:35
  • Thanks for your reply @V.Samma. Based on your example I get the following; "ID", "Person.Name", "Person.Age", "Address.City", "Address.Street", "Address.Country", "ID1", "Person.Name1", "Person.Age1", "Address.City1", "Address.Street1", "Address.Country1","ID2", "Person.Name2", "Person.Age2", "Address.City2", "Address.Street2", "Address.Country2" ... etc this goes on. What I would like is those new columns to be Rows in my dataframe, so the data in "ID1" and "ID2" would be under the ID column. Thanks – ukbaz Sep 28 '17 at 09:21
  • @ukbaz I don't understand how is this possible. How does your initial, unflattened data/schema look? With my example, this initial schema with this posted solution produces 6 columns. Each data row is still a single data row, but instead of objects now contain single values. – V. Samma Sep 28 '17 at 09:50
  • @V.Samma it was due to the dataframe being created from nested json. Example schema is here https://stackoverflow.com/questions/46455491/flatten-nested-spark-1-6-dataframe-with-structtypes?noredirect=1#comment79866665_46455491 – ukbaz Sep 28 '17 at 10:12
  • @ukbaz Well, my solution here works correctly. That would be the expected output for your schema. You have them as different structs, you can't hope that flattening the schema also magically merges some columns and adds corresponding values as separate rows. The struct names are defined as numbers? That is a problem in itself. But also their respective schemas don't seem similar so you could merge them. I would suggest you try to get the data in a better format before hand, so you have correctly named columns/structs and clear overview, not duplicate date in different structs' values etc. – V. Samma Sep 29 '17 at 10:37
  • @V.Samma Can you please help me with https://stackoverflow.com/questions/59662227/flatten-only-deepest-level-in-scala-spark-dataframe – mythic Jan 09 '20 at 11:55
  • 1
    @mythic Unfortunately, I haven't been working with Spark for a few years but it seems you already got help with that :) – V. Samma Jan 10 '20 at 12:27
5

I added a DataFrame#flattenSchema method to the open source spark-daria project.

Here's how you can use the function with your code.

import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()

+-------+-------+---------+----+---+
|foo.bar|foo.baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

You can also specify different column name delimiters with the flattenSchema() method.

df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

This delimiter parameter is surprisingly important. If you're flattening your schema to load the table in Redshift, you won't be able to use periods as the delimiter.

Here's the full code snippet to generate this output.

val data = Seq(
  Row(Row("this", "is"), "something", "cool", ";)")
)

val schema = StructType(
  Seq(
    StructField(
      "foo",
      StructType(
        Seq(
          StructField("bar", StringType, true),
          StructField("baz", StringType, true)
        )
      ),
      true
    ),
    StructField("x", StringType, true),
    StructField("y", StringType, true),
    StructField("z", StringType, true)
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

df.flattenSchema().show()

The underlying code is similar to David Griffin's code (in case you don't want to add the spark-daria dependency to your project).

object StructTypeHelpers {

  def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
    schema.fields.flatMap(structField => {
      val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
      val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name

      structField.dataType match {
        case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
        case _ => Array(col(codeColName).alias(colName))
      }
    })
  }

}

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
      df.select(
        StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
      )
    }

  }

}
Powers
  • 18,150
  • 10
  • 103
  • 108
4

========== edit ====

There's some additional handling for more complex schemas here: https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

==================

PySpark, added to @Evan V's answer, when your field-names have special characters, like a dot '.', a hyphen '-', ...:

from pyspark.sql.types import StructType, ArrayType  

def normalise_field(raw):
    return raw.strip().lower() \
            .replace('`', '') \
            .replace('-', '_') \
            .replace(' ', '_') \
            .strip('_')

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(col(name).alias(normalise_field(name)))

    return fields

df.select(flatten(df.schema)).show()
Averell
  • 793
  • 2
  • 10
  • 21
3

You could also use SQL to select columns as flat.

  1. Get original data-frame schema
  2. Generate SQL string, by browsing schema
  3. Query your original data-frame

I did an implementation in Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(use recursive method as well, I prefer SQL way, so you can test it easily via Spark-shell).

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
3

Here is a function that is doing what you want and that can deal with multiple nested columns containing columns with same name, with a prefix:

from pyspark.sql import functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

Before:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

After:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)
steco
  • 1,303
  • 13
  • 16
3

To combine David Griffen and V. Samma answers, you could just do this to flatten while avoiding duplicate column names:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName).as(colName.replace(".","_")))
    }
  })
}

def flattenDataFrame(df:DataFrame): DataFrame = {
    df.select(flattenSchema(df.schema):_*)
}

var my_flattened_json_table = flattenDataFrame(my_json_table)
swdev
  • 2,941
  • 2
  • 25
  • 37
3

This is a modification of the solution but it uses tailrec notation


  @tailrec
  def flattenSchema(
      splitter: String,
      fields: List[(StructField, String)],
      acc: Seq[Column]): Seq[Column] = {
    fields match {
      case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
        val newPrefix = s"$prefix${field.name}."
        val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
        flattenSchema(splitter, tail ++ newFields, acc)

      case (field, prefix) :: tail =>
        val colName = s"$prefix${field.name}"
        val newCol  = col(colName).as(colName.replace(".", splitter))
        flattenSchema(splitter, tail, acc :+ newCol)

      case _ => acc
    }
  }
  def flattenDataFrame(df: DataFrame): DataFrame = {
    val fields = df.schema.fields.map((_, ""))
    df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
  }
fhuertas
  • 4,764
  • 2
  • 17
  • 28
  • this tailrec feature of flatten dataframe works really good for struct type dataframe. Can you please add a case to handle array type with explode features. It wud be really helpful for me. Thanks in advance – ungalVicky Jan 27 '23 at 14:07
2

A little addition to the code above, if you are working with Nested Struct and Array.

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f match {
        case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
        case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
        case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
        case _ => Array(col(colName))
      }
    })
  }

  • I am attempting to implement this logic into the spark suggestion given by Evan V but cannot seem to get the code right for the Struct within Array type--I would appreciate the help if anyone has ideas. – user1983682 Dec 04 '19 at 20:03
  • Can we add a depth to scan while flattening the schema ? – Sampat Kumar Apr 06 '20 at 06:14
  • I am trying to use it but it is not giving proper input. I have a a, a.b, a.b.c, a.b.d but it is not doing flattening for the last child level – NickyPatel May 24 '21 at 11:36
1

I have been using one liners which result in a flattened schema with 5 columns of bar, baz, x, y, z:

df.select("foo.*", "x", "y", "z")

As for explode: I typically reserve explode for flattening a list. For example if you have a column idList that is a list of Strings, you could do:

df.withColumn("flattenedId", functions.explode(col("idList")))
  .drop("idList")

That will result in a new Dataframe with a column named flattenedId (no longer a list)

Kei-ven
  • 432
  • 3
  • 10
1

This is based on @Evan V's solution to deal with more heavily nested Json files. For me the problem with original solution is When there is an ArrayType nested right in another ArrayType, I got an error.

for example if a Json looks like:

{"e":[{"f":[{"g":"h"}]}]}

I will get an error:

"cannot resolve '`e`.`f`['g']' due to data type mismatch: argument 2 requires integral type

To solve this I modified the code a bit, I agree this looks super stupid bust just posting it here so that someone may come up with a nicer solution.

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, T.StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


def explodeDF(df):
    for (name, dtype) in df.dtypes:
        if "array" in dtype:
            df = df.withColumn(name, F.explode(name))

    return df

def df_is_flat(df):
    for (_, dtype) in df.dtypes:
        if ("array" in dtype) or ("struct" in dtype):
            return False

    return True

def flatJson(jdf):
    keepGoing = True
    while(keepGoing):
        fields = flatten(jdf.schema)
        new_fields = [item.replace(".", "_") for item in fields]
        jdf = jdf.select(fields).toDF(*new_fields)
        jdf = explodeDF(jdf)
        if df_is_flat(jdf):
            keepGoing = False

    return jdf

Usage:

df = spark.read.json(path_to_json)
flat_df = flatJson(df)

flat_df.show()
+---+---+-----+
|  a|e_c|e_f_g|
+---+---+-----+
|  b|  d|    h|
+---+---+-----+
0
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ListBuffer 
val columns=new ListBuffer[String]()

def flattenSchema(schema:StructType,prefix:String=null){
for(i<-schema.fields){
  if(i.dataType.isInstanceOf[StructType]) {
    val columnPrefix = i.name + "."
    flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix)
  }
  else {
    if(prefix == null)
      columns.+=(i.name)
    else
      columns.+=(prefix+i.name)
  }
  }
}
Ishan Kumar
  • 1,941
  • 3
  • 20
  • 29
0

Combining Evan V's, Avrell and Steco ideas. I am also providing a complete SQL syntax while handling query fields with special characters using '`' in PySpark.

The solution below gives the following,

  1. Handle Nested JSON Schema.
  2. Handle same column names across nested columns (We will give alias name of the entire hierarchy separated by underscores).
  3. Handle Special Characters. (we handle special characters with '', I have not handled consecutive occurences of '' but we can do that as well with appropriate 'sub' replacements)
  4. Gives us SQL syntax.
  5. Query Fields are enclosed within '`'.

Code snippet is below,

df=spark.read.json('<JSON FOLDER / FILE PATH>')
df.printSchema()
from pyspark.sql.types import StructType, ArrayType

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            alias_name=name.replace('.','_').replace(' ','_').replace('(','').replace(')','').replace('-','_').replace('&','_').replace(r'(_){2,}',r'\1')
            name=name.replace('.','`.`')
            field_name = "`" + name + "`" + " AS " + alias_name
            fields.append(field_name)
    return fields

df.createOrReplaceTempView("to_flatten_df")
query_fields=flatten(df.schema)

def listToString(s):  
    
    # initialize an empty string 
    str1 = ""
    # traverse in the string   
    for ele in s:  
        str1 = str1 + ele + ','
    # return string   
    return str1  

spark.sql("SELECT " + listToString(query_fields)[:-1] + " FROM to_flatten_df" ).show()