75

I have a dataframe with the following structure:

 |-- data: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- keyNote: struct (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |-- details: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

How it is possible to flatten the structure and create a new dataframe:

     |-- id: long (nullable = true)
     |-- keyNote: struct (nullable = true)
     |    |-- key: string (nullable = true)
     |    |-- note: string (nullable = true)
     |-- details: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)

Is there something like explode, but for structs?

Marioanzas
  • 1,663
  • 2
  • 10
  • 33
djWann
  • 2,017
  • 4
  • 31
  • 36
  • 1
    The answers at https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql were also helpful. – erwaman Jan 10 '18 at 15:37
  • 1
    a nice solution is also presented here: https://stackoverflow.com/questions/47285871/exploded-struct-in-spark?rq=1 – TobiStraub Jul 23 '19 at 13:25

15 Answers15

129

This should work in Spark 1.6 or later:

df.select(df.col("data.*"))

or

df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))
  • 16
    Exception in thread "main" org.apache.spark.sql.AnalysisException: No such struct field * – djWann Aug 03 '16 at 21:54
  • but using select on all the columns like df.select(df.col1, df.col2, df.col3) works, so I will accept this answer – djWann Aug 03 '16 at 22:00
  • I was just editing but it is strange. I can use *. Maybe some version issue? –  Aug 03 '16 at 22:01
  • Yeah maybe. I'm using spark 1.6.1 and scala 2.10 – djWann Aug 03 '16 at 22:07
  • How would you select key or note under the nested struct keyNote? – SparkleGoat Sep 27 '17 at 15:19
  • when I am selecting as df.select("data.*'), it gives me n*n rows. (Duplicated n rows for each row). My data frame having n-2 distinct rows, so if I put distinct it gives me n-2 result. But I want n results, which is actually present in my data. How can I achieve this using above mentioned select command. – Madman Jun 22 '21 at 19:38
40

Here is function that is doing what you want and that can deal with multiple nested columns containing columns with same name:

import pyspark.sql.functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

Before:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

After:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)
amza
  • 780
  • 2
  • 7
  • 32
steco
  • 1,303
  • 13
  • 16
19

For Spark 2.4.5,

while,df.select(df.col("data.*")) will give you org.apache.spark.sql.AnalysisException: No such struct field * in exception

this will work:-

df.select($"data.*")
Pratik Anurag
  • 199
  • 1
  • 3
  • 1
    This works with Spark 3.1.0 too, but it doesn't preserve the `data` or whatever is parent is selected -- and doesn't descend if there are further nested structs. – Nevermore Jun 08 '21 at 19:54
  • when I am selecting as df.select("data.*'), it gives me n*n rows. (Duplicated n rows for each row). My data frame having n-2 distinct rows, so if I put distinct it gives me n-2 result. But I want n results, which is actually present in my data. How can I achieve this using above mentioned select command. – Madman Jun 22 '21 at 20:03
  • 4
    the dollar can be omitted :) – meniluca Jan 07 '22 at 15:41
  • Tested with DBR 9.1, Spark 3.1.2 and it works. `df.select("data.*")` – Aman Sehgal Jan 20 '22 at 07:44
14

This flatten_df version flattens the dataframe at every layer level, using a stack to avoid recursive calls:

from pyspark.sql.functions import col


def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Example:

from pyspark.sql.types import StringType, StructField, StructType


schema = StructType([
    StructField("some", StringType()),

    StructField("nested", StructType([
        StructField("nestedchild1", StringType()),
        StructField("nestedchild2", StringType())
    ])),

    StructField("renested", StructType([
        StructField("nested", StructType([
            StructField("nestedchild1", StringType()),
            StructField("nestedchild2", StringType())
        ]))
    ]))
])

data = [
    {
        "some": "value1",
        "nested": {
            "nestedchild1": "value2",
            "nestedchild2": "value3",
        },
        "renested": {
            "nested": {
                "nestedchild1": "value4",
                "nestedchild2": "value5",
            }
        }
    }
]

df = spark.createDataFrame(data, schema)
flat_df = flatten_df(df)
print(flat_df.collect())

Prints:

[Row(some=u'value1', renested_nested_nestedchild1=u'value4', renested_nested_nestedchild2=u'value5', nested_nestedchild1=u'value2', nested_nestedchild2=u'value3')]
federicojasson
  • 362
  • 4
  • 14
  • 1
    This doesn't seem to recurse into nested structs inside arrays. – malthe Mar 04 '21 at 10:05
  • @malthe It won't. I don't think it's feasible to do that, actually. Assuming you use the array index as column name (e.g. array.0.field, array.1.field, ...), you'll have to know the length of the array beforehand. All these solutions iterate the dataframe structure, which is known at the driver. – federicojasson Mar 04 '21 at 21:56
  • 1
    I ended up figuring out how to do it and posted a script here: https://stackoverflow.com/a/66482320/647151. – malthe Mar 04 '21 at 22:25
  • Oh, so the idea was to keep the array but transform the structures it contains. Nice! – federicojasson Mar 05 '21 at 12:36
  • how could you prevent certain columns from getting flattened using this approach? – VinnyD Dec 12 '22 at 06:24
  • 1
    If this code is original to this answer then feels like you should have been credited here! https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema#define-a-function-to-flatten-the-nested-schema – Martin Smith Aug 24 '23 at 13:05
  • 1
    @MartinSmith Yeah, I wrote that code snippet just for this answer. – federicojasson Aug 24 '23 at 15:39
8

A little more compact and efficient implementation:

No need to create list and iterate on them. You "act" on fields based on their type (if structures or not).

you create a list and iterate on it, if the column is nested (struct) you need to flat it (.*) else you access with dot notation (parent.child) and replace . with _ (parent_child)

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []
    while len(stack) > 0:
        parents, df = stack.pop()
        for column_name, column_type in df.dtypes:
            if column_type[:6] == "struct":
                projected_df = df.select(column_name + ".*")
                stack.append((parents + (column_name,), projected_df))
            else:
                columns.append(col(".".join(parents + (column_name,))).alias("_".join(parents + (column_name,))))
    return nested_df.select(columns)
  • 1
    Welcome to Stack Overflow. Code-only answers are discouraged on Stack Overflow because they don't explain how it solves the problem. Please edit your answer to explain what this code does and how it is more efficient than the other answers as you say it is, so that it is useful to other users with similar issues and they can learn from it. – FluffyKitten Sep 15 '20 at 01:45
  • 1
    worked perfect for me. You would get more like if you add more explanation for sure. – user2201536 Nov 09 '21 at 23:23
  • what is the data type for "parents" @Domenico Di Nicola – soMuchToLearnAndShare Jul 19 '22 at 19:48
  • I had problems when the column_name includes 'dot' in it. i know if it is explicit column name i can use back tick to escape it, but above, i could not do it. – soMuchToLearnAndShare Jul 19 '22 at 21:53
5

I generalized the solution from stecos a bit more so the flattening can be done on more than two struct layers deep:

def flatten_df(nested_df, layers):
    flat_cols = []
    nested_cols = []
    flat_df = []

    flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'])
    nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'])

    flat_df.append(nested_df.select(flat_cols[0] +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols[0]
                                for c in nested_df.select(nc+'.*').columns])
                  )
    for i in range(1, layers):
        print (flat_cols[i-1])
        flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct'])
        nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct'])

        flat_df.append(flat_df[i-1].select(flat_cols[i] +
                                [col(nc+'.'+c).alias(nc+'_'+c)
                                    for nc in nested_cols[i]
                                    for c in flat_df[i-1].select(nc+'.*').columns])
        )

    return flat_df[-1]

just call with:

my_flattened_df = flatten_df(my_df_having_nested_structs, 3)

(second parameter is the level of layers to be flattened, in my case it's 3)

Aydin K.
  • 3,309
  • 36
  • 44
4

PySpark solution to flatten nested df with both struct and array types with any level of depth. This is improved on this: https://stackoverflow.com/a/56533459/7131019

from pyspark.sql.types import *
from pyspark.sql import functions as f

def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        
        parents, df = stack.pop()
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        
        flat_cols = [
            f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        
        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
        
    return nested_df.select(columns)

def flatten_array_struct_df(df):
    
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    
    while len(array_cols) > 0:
        
        for array_col in array_cols:
            
            cols_to_select = [x for x in df.columns if x != array_col ]
            
            df = df.withColumn(array_col, f.explode(f.col(array_col)))
            
        df = flatten_structs(df)
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df

flat_df = flatten_array_struct_df(df)
Narahari B M
  • 346
  • 1
  • 16
2

You can use this approach if, you have to covert only struct types. I would not suggest converting the array, as it could lead to duplicate records.

from pyspark.sql.functions import col
from pyspark.sql.types import StructType


def flatten_schema(schema, prefix=""):
    return_schema = []
    for field in schema.fields:
        if isinstance(field.dataType, StructType):
            if prefix:
                return_schema = return_schema + flatten_schema(field.dataType, "{}.{}".format(prefix, field.name))
            else:
                return_schema = return_schema + flatten_schema(field.dataType, field.name)
        else:
            if prefix:
                field_path = "{}.{}".format(prefix, field.name)
                return_schema.append(col(field_path).alias(field_path.replace(".", "_")))
            else:
                return_schema.append(field.name)
    return return_schema

You can use it as

new_schema = flatten_schema(df.schema)
df1 = df.select(se)
df1.show()
Amrish Mishra
  • 180
  • 1
  • 12
2

Based on https://stackoverflow.com/a/49532496/17250408 here is solution for struct and array fields with multilevel nesting

from pyspark.sql.functions import col, explode


def type_cols(df_dtypes, filter_type):
    cols = []
    for col_name, col_type in df_dtypes:
        if col_type.startswith(filter_type):
            cols.append(col_name)
    return cols


def flatten_df(nested_df, sep='_'):
    nested_cols = type_cols(nested_df.dtypes, "struct")
    flatten_cols = [fc for fc, _ in nested_df.dtypes if fc not in nested_cols]
    for nc in nested_cols:
        for cc in nested_df.select(f"{nc}.*").columns:
            if sep is None:
                flatten_cols.append(col(f"{nc}.{cc}").alias(f"{cc}"))
            else:
                flatten_cols.append(col(f"{nc}.{cc}").alias(f"{nc}{sep}{cc}"))
    return nested_df.select(flatten_cols)


def explode_df(nested_df):
    nested_cols = type_cols(nested_df.dtypes, "array")
    exploded_df = nested_df
    for nc in nested_cols:
        exploded_df = exploded_df.withColumn(nc, explode(col(nc)))
    return exploded_df


def flatten_explode_df(nested_df):
    df = nested_df
    struct_cols = type_cols(nested_df.dtypes, "struct")
    array_cols = type_cols(nested_df.dtypes, "array")
    if struct_cols:
        df = flatten_df(df)
        return flatten_explode_df(df)
    if array_cols:
        df = explode_df(df)
        return flatten_explode_df(df)
    return df


df = flatten_explode_df(nested_df)
V_K
  • 21
  • 4
1

An easy way is to use SQL, you could build a SQL query string to alias nested column as flat ones.

  • Retrieve data-frame schema (df.schema())
  • Transform schema to SQL (for (field : schema().fields()) ...
  • Query:

    val newDF = sqlContext.sql("SELECT " + sqlGenerated + " FROM source")
    

Here is an example in Java.

(I prefer SQL way, so you can easily test it on Spark-shell and it's cross-language).

Tshilidzi Mudau
  • 7,373
  • 6
  • 36
  • 49
Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
1

below worked for me in spark sql

import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.{explode, expr, posexplode, when}

object StackOverFlowQuestion {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("FlattenTest")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("FlattenTest")
      .config("spark.sql.warehouse.dir", "C:\\Temp\\hive")
      .master("local[2]")
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    val stringTest =
      """{
                               "total_count": 123,
                               "page_size": 20,
                               "another_id": "gdbfdbfdbd",
                               "sen": [{
                                "id": 123,
                                "ses_id": 12424343,
                                "columns": {
                                    "blah": "blah",
                                    "count": 1234
                                },
                                "class": {},
                                "class_timestamps": {},
                                "sentence": "spark is good"
                               }]
                            }
                             """
    val result = List(stringTest)
    val githubRdd=spark.sparkContext.makeRDD(result)
    val gitHubDF=spark.read.json(githubRdd)
    gitHubDF.show()
    gitHubDF.printSchema()

    gitHubDF.registerTempTable("JsonTable")

   spark.sql("with cte as" +
      "(" +
      "select explode(sen) as senArray  from JsonTable" +
      "), cte_2 as" +
      "(" +
      "select senArray.ses_id,senArray.ses_id,senArray.columns.* from cte" +
      ")" +
      "select * from cte_2"
    ).show()

    spark.stop()
}

}

output:-

+----------+---------+--------------------+-----------+
|another_id|page_size|                 sen|total_count|
+----------+---------+--------------------+-----------+
|gdbfdbfdbd|       20|[[[blah, 1234], 1...|        123|
+----------+---------+--------------------+-----------+

root
 |-- another_id: string (nullable = true)
 |-- page_size: long (nullable = true)
 |-- sen: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- columns: struct (nullable = true)
 |    |    |    |-- blah: string (nullable = true)
 |    |    |    |-- count: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sentence: string (nullable = true)
 |    |    |-- ses_id: long (nullable = true)
 |-- total_count: long (nullable = true)

+--------+--------+----+-----+
|  ses_id|  ses_id|blah|count|
+--------+--------+----+-----+
|12424343|12424343|blah| 1234|
+--------+--------+----+-----+
1

This is for scala spark.

val totalMainArrayBuffer=collection.mutable.ArrayBuffer[String]()
def flatten_df_Struct(dfTemp:org.apache.spark.sql.DataFrame,dfTotalOuter:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame=
{
//dfTemp.printSchema
val totalStructCols=dfTemp.dtypes.map(x => x.toString.substring(1,x.toString.size-1)).filter(_.split(",",2)(1).contains("Struct")) // in case i the column names come with the word Struct embedded in it
val mainArrayBuffer=collection.mutable.ArrayBuffer[String]()
for(totalStructCol <- totalStructCols)
{
val tempArrayBuffer=collection.mutable.ArrayBuffer[String]()
tempArrayBuffer+=s"${totalStructCol.split(",")(0)}.*"
//tempArrayBuffer.toSeq.toDF.show(false)
val columnsInside=dfTemp.selectExpr(tempArrayBuffer:_*).columns
for(column <- columnsInside)
mainArrayBuffer+=s"${totalStructCol.split(",")(0)}.${column} as ${totalStructCol.split(",")(0)}_${column}"
//mainArrayBuffer.toSeq.toDF.show(false)
}
//dfTemp.selectExpr(mainArrayBuffer:_*).printSchema
val nonStructCols=dfTemp.selectExpr(mainArrayBuffer:_*).dtypes.map(x => x.toString.substring(1,x.toString.size-1)).filter(!_.split(",",2)(1).contains("Struct")) // in case i the column names come with the word Struct embedded in it
for (nonStructCol <- nonStructCols)
totalMainArrayBuffer+=s"${nonStructCol.split(",")(0).replace("_",".")} as ${nonStructCol.split(",")(0)}" // replacing _ by . in origial select clause if it's an already nested column 
dfTemp.selectExpr(mainArrayBuffer:_*).dtypes.map(x => x.toString.substring(1,x.toString.size-1)).filter(_.split(",",2)(1).contains("Struct")).size 
match {
case value if value ==0 => dfTotalOuter.selectExpr(totalMainArrayBuffer:_*)
case _ => flatten_df_Struct(dfTemp.selectExpr(mainArrayBuffer:_*),dfTotalOuter)
}
}


def flatten_df(dfTemp:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame=
{
var totalArrayBuffer=collection.mutable.ArrayBuffer[String]()
val totalNonStructCols=dfTemp.dtypes.map(x => x.toString.substring(1,x.toString.size-1)).filter(!_.split(",",2)(1).contains("Struct")) // in case i the column names come with the word Struct embedded in it
for (totalNonStructCol <- totalNonStructCols)
totalArrayBuffer+=s"${totalNonStructCol.split(",")(0)}"
totalMainArrayBuffer.clear
flatten_df_Struct(dfTemp,dfTemp) // flattened schema is now in totalMainArrayBuffer 
totalArrayBuffer=totalArrayBuffer++totalMainArrayBuffer
dfTemp.selectExpr(totalArrayBuffer:_*)
}


flatten_df(dfTotal.withColumn("tempStruct",lit(5))).printSchema



File

{"num1":1,"num2":2,"bool1":true,"bool2":false,"double1":4.5,"double2":5.6,"str1":"a","str2":"b","arr1":[3,4,5],"map1":{"cool":1,"okay":2,"normal":3},"carInfo":{"Engine":{"Make":"sa","Power":{"IC":"900","battery":"165"},"Redline":"11500"} ,"Tyres":{"Make":"Pirelli","Compound":"c1","Life":"120"}}}
{"num1":3,"num2":4,"bool1":false,"bool2":false,"double1":4.2,"double2":5.5,"str1":"u","str2":"n","arr1":[6,7,9],"map1":{"fast":1,"medium":2,"agressive":3},"carInfo":{"Engine":{"Make":"na","Power":{"IC":"800","battery":"150"},"Redline":"10000"} ,"Tyres":{"Make":"Pirelli","Compound":"c2","Life":"100"}}}
{"num1":8,"num2":4,"bool1":true,"bool2":true,"double1":5.7,"double2":7.5,"str1":"t","str2":"k","arr1":[11,12,23],"map1":{"preserve":1,"medium":2,"fast":3},"carInfo":{"Engine":{"Make":"ta","Power":{"IC":"950","battery":"170"},"Redline":"12500"} ,"Tyres":{"Make":"Pirelli","Compound":"c3","Life":"80"}}}
{"num1":7,"num2":9,"bool1":false,"bool2":true,"double1":33.2,"double2":7.5,"str1":"b","str2":"u","arr1":[12,14,5],"map1":{"normal":1,"preserve":2,"agressive":3},"carInfo":{"Engine":{"Make":"pa","Power":{"IC":"920","battery":"160"},"Redline":"11800"} ,"Tyres":{"Make":"Pirelli","Compound":"c4","Life":"70"}}}

Before:

root
 |-- arr1: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- bool1: boolean (nullable = true)
 |-- bool2: boolean (nullable = true)
 |-- carInfo: struct (nullable = true)
 |    |-- Engine: struct (nullable = true)
 |    |    |-- Make: string (nullable = true)
 |    |    |-- Power: struct (nullable = true)
 |    |    |    |-- IC: string (nullable = true)
 |    |    |    |-- battery: string (nullable = true)
 |    |    |-- Redline: string (nullable = true)
 |    |-- Tyres: struct (nullable = true)
 |    |    |-- Compound: string (nullable = true)
 |    |    |-- Life: string (nullable = true)
 |    |    |-- Make: string (nullable = true)
 |-- double1: double (nullable = true)
 |-- double2: double (nullable = true)
 |-- map1: struct (nullable = true)
 |    |-- agressive: long (nullable = true)
 |    |-- cool: long (nullable = true)
 |    |-- fast: long (nullable = true)
 |    |-- medium: long (nullable = true)
 |    |-- normal: long (nullable = true)
 |    |-- okay: long (nullable = true)
 |    |-- preserve: long (nullable = true)
 |-- num1: long (nullable = true)
 |-- num2: long (nullable = true)
 |-- str1: string (nullable = true)
 |-- str2: string (nullable = true

After:

root
 |-- arr1: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- bool1: boolean (nullable = true)
 |-- bool2: boolean (nullable = true)
 |-- double1: double (nullable = true)
 |-- double2: double (nullable = true)
 |-- num1: long (nullable = true)
 |-- num2: long (nullable = true)
 |-- str1: string (nullable = true)
 |-- str2: string (nullable = true)
 |-- map1_agressive: long (nullable = true)
 |-- map1_cool: long (nullable = true)
 |-- map1_fast: long (nullable = true)
 |-- map1_medium: long (nullable = true)
 |-- map1_normal: long (nullable = true)
 |-- map1_okay: long (nullable = true)
 |-- map1_preserve: long (nullable = true)
 |-- carInfo_Engine_Make: string (nullable = true)
 |-- carInfo_Engine_Redline: string (nullable = true)
 |-- carInfo_Tyres_Compound: string (nullable = true)
 |-- carInfo_Tyres_Life: string (nullable = true)
 |-- carInfo_Tyres_Make: string (nullable = true)
 |-- carInfo_Engine_Power_IC: string (nullable = true)
 |-- carInfo_Engine_Power_battery: string (nullable = true)

Tried for 2 Levels, it worked

Raptor0009
  • 258
  • 4
  • 14
0

We used https://github.com/lvhuyen/SparkAid It works to any Level

from sparkaid import flatten

flatten(df_nested_B).printSchema()

Raj ks
  • 1
  • 3
0

If you have a mix of multi-level nested structs and arrays, the code below will help.

The code defines a class called NestedDF that can be used to flatten a mix of nested structs and nested arrays in a PySpark dataframe.

In a hurry? Here's One-Liner showing how to use the code right-away:

nested_df = NestedDF(my_df).flattened_df ...just replace my_df with your pyspark data_frame

from pyspark.sql.functions import col, size
from pyspark.sql.types import ArrayType

class NestedDF:
"""A class for flattening nested dataframes in PySpark."""

def __init__(self, nested_df):
    """
    Args:
        nested_df (pyspark.sql.dataframe.DataFrame): Nested dataframe.
    """
    self.nested_df = nested_df
    self.flattened_struct_df = self.flatten_struct_df()
    self.flattened_df = self.flatten_array_df()

def flatten_array_df(self):
    """Flatten a nested array dataframe into a single level dataframe.

    Returns:
        pyspark.sql.dataframe.DataFrame: Flattened dataframe.
    """
    cols = self.flattened_struct_df.columns
    for col_name in cols:
        if isinstance(self.flattened_struct_df.schema[col_name].dataType, ArrayType):
            array_len = self.flattened_struct_df.select(size(col(col_name)).alias("array_len")).collect()[0]["array_len"]
            for i in range(array_len):
                self.flattened_struct_df = self.flattened_struct_df.withColumn(col_name + "_" + str(i), self.flattened_struct_df[col_name].getItem(i))
            self.flattened_struct_df = self.flattened_struct_df.drop(col_name)
    return self.flattened_struct_df

def flatten_struct_df(self):
    """Flatten a nested dataframe into a single level dataframe.

    Returns:
        pyspark.sql.dataframe.DataFrame: Flattened dataframe.
    """
    stack=[((), self.nested_df)]
    columns=[]
    while len(stack)>0:
        parents, df=stack.pop()
        for col_name, col_type in df.dtypes:
            if col_type.startswith('struct'):
                stack.append((parents+(col_name,), df.select(col_name+".*")))
            else:
                columns.append(col(".".join(parents+(col_name,))).alias("_".join(parents+(col_name,))))
    return self.nested_df.select(columns)

Explanation:

The class has two methods: flatten_array_df() and flatten_struct_df(). flatten_array_df() flattens a nested array dataframe into a single-level dataframe. It first calls the flatten_struct_df() method to convert any nested structs in the dataframe into a single-level dataframe. Then, it iterates over the columns of the resulting dataframe and checks if each column contains an array. If a column contains an array, it calculates the length of the array and then iterates over each element of the array. For each element, it creates a new column in the dataframe with a name that combines the original column name and the index of the element. Finally, it drops the original column and returns the resulting flattened dataframe.

flatten_struct_df() flattens a nested dataframe that contains structs into a single-level dataframe. It first creates an empty stack and adds a tuple containing an empty tuple and the input nested dataframe to the stack. It then iteratively pops the top tuple from the stack and checks if each column of the corresponding dataframe contains a struct. If a column contains a struct, it adds a new tuple to the stack that contains the original column name as well as the struct's columns, and selects only those columns from the original dataframe. If a column does not contain a struct, it constructs a new column name by combining the column names of all of its parent structs and the original column name, and adds the resulting column to a list. Finally, it selects the columns in the list and returns the resulting flattened dataframe. AI Co-pair AideMates: A Comprehensive Business Proposal Introduction: The Business Landscape and The Role of AI

In today's fast-paced business world, mere survival isn't the goal—it's about thriving, evolving, and leading. The rapid advancements in technology, particularly in artificial intelligence, have set a new paradigm for businesses globally. Organizations now stand at a pivotal juncture, a crossroads that can lead to unparalleled growth or stagnation.

Harnessing the power of AI doesn't just offer incremental benefits—it has the potential to redefine our entire path, propelling us to the forefront of our industry. However, while our organization has stringent checks and balances in place to ensure quality and compliance, there's a downside. These very systems, while integral to our operations, sometimes act as inadvertent barriers. It's akin to an overzealous immune system in our bodies, which, in its quest to protect, ends up mistakenly attacking healthy cells. This analogy highlights the essence of our challenge: How do we evolve without compromising our core?

We are here not just to highlight challenges but to offer solutions. As we delve deeper into this proposal, we will unveil a new roadmap to success, one that synergizes human intelligence with the capabilities of AI, ensuring that our organization is not just a participant but a trendsetter in the next business revolution. The Imperative of Innovation in Today's Business Climate

The business world is in a constant state of flux, driven by technological advancements, evolving consumer preferences, and a global marketplace that never sleeps. In such a dynamic environment, standing still equates to moving backward. Organizations that don't adapt, evolve, and innovate are at risk of becoming obsolete, overshadowed by more agile competitors.

While our organization has achieved numerous milestones and has a storied legacy, resting on our laurels isn't an option. At times, our established checks and balances, designed to ensure quality and mitigate risks, inadvertently stifle the very innovation we strive for. It's a paradox, where the mechanisms meant to protect us can sometimes work against us. Drawing a parallel from biology, just as an overly aggressive immune system can lead to auto-immune diseases, where the body attacks its own cells, an excessive bureaucratic structure can suffocate budding ideas, depriving them of the nourishment they need to flourish.

However, acknowledging this challenge is the first step to overcoming it. This proposal aims to bridge this gap, ensuring that while we continue to maintain our standards and uphold our values, we also foster a culture of innovation, where ideas are nurtured, encouraged, and brought to fruition. Addressing the Elephant in the Room: Bureaucracy as a Double-Edged Sword

Every organization, as it grows and evolves, develops systems and processes to ensure stability, consistency, and risk management. These bureaucratic structures, initially implemented with the best intentions, can sometimes become cumbersome. As we expand, the very systems that were once our safeguards can become our shackles, especially in a rapidly changing environment.

It's an issue we've observed consistently: our checks and balances, while crucial in certain aspects, have inadvertently morphed into barriers in others. The analogy is clear—just as an overactive immune system can do more harm than good, our bureaucratic structures sometimes hinder transformative ideas and stifle innovation.

Charting a New Path: A Solution-Oriented Approach

Merely identifying the problem isn't enough; we need actionable solutions. The way forward isn't about dismantling our existing structures, but rather refining and adapting them. We need to pivot our perspective, embracing a more agile and responsive approach. This means fostering an environment where innovative ideas are not just welcomed but actively encouraged. It means streamlining processes to ensure that transformative projects aren't bogged down by red tape. And most importantly, it means having the right tools, like AI Copair AideMates, that can seamlessly integrate with our current systems, enhancing efficiency and driving growth. Harnessing the Power of AI: A Bold Strategy for Organizational Transformation

Executive Summary:

In an era where technological disruption is the norm, organizations must be proactive, not reactive. This proposal presents a transformative strategy, centered on the integration of AI Co-pair AideMates into our organizational fabric. But why AI Co-pair AideMates? The answer lies in its potential to catalyze innovation at a scale previously unimaginable.

By strategically integrating this tool, we can break down bureaucratic barriers that have traditionally hampered rapid ideation and implementation. This isn't just about staying relevant—it's about setting industry standards, leading the way, and ensuring that our organization isn't just a player but a pioneer.

  1. Introduction: Breaking Bureaucratic Barriers

Every organization, irrespective of size or industry, grapples with bureaucracy. While these structures are vital for ensuring consistency and mitigating risks, they can also be a double-edged sword. Bureaucracy, in its excessive forms, can impede the flow of innovative ideas, causing delays, and sometimes even leading to missed opportunities.

In the age of digital disruption, where agility is paramount, we need a fresh approach. AI Co-pair AideMates offers a solution, enabling us to strike a balance between operational efficiency and innovation. By leveraging the capabilities of this tool, we can foster an environment where ideas are rapidly transformed into actionable strategies, ensuring our organization's continued growth and leadership in the industry. Technical Insights and Implementation of AI Co-pair AideMates

  1. Understanding the Technical Landscape:

AI Co-pair AideMates, at its core, leverages state-of-the-art machine learning algorithms and neural networks to enhance decision-making processes, automate repetitive tasks, and provide insights derived from vast datasets. The platform is designed to be scalable, ensuring that as our organizational needs grow, AI Co-pair AideMates can adapt and expand accordingly.

  1. Challenges and Solutions:

While the benefits of integrating AI Co-pair AideMates are numerous, it's essential to acknowledge potential challenges:

  • Data Privacy and Security: As with any AI-driven platform, there's a need to ensure that data is handled with the utmost security and confidentiality. We must establish robust data governance protocols and employ end-to-end encryption to safeguard sensitive information.
  • Integration with Existing Systems: Seamless integration with our current IT infrastructure is vital. We may need to invest in middleware solutions or API integrations to ensure compatibility.
  • Change Management: Introducing a new system means a shift in how employees work. We'll need comprehensive training programs and support structures to facilitate a smooth transition.
  1. Strategies for Successful Integration:

To ensure the successful implementation of AI Co-pair AideMates, we must:

  • Collaborate with IT Teams: Engaging our IT department from the outset will ensure technical challenges are addressed proactively.
  • Pilot Testing: Before a full-scale rollout, a pilot phase will allow us to identify potential issues and rectify them.
  • Continuous Feedback Loop: Encouraging feedback from users will enable us to make iterative improvements, ensuring the platform remains aligned with organizational needs.

In conclusion, while the journey to fully integrate AI Co-pair AideMates will require diligence and commitment, the potential rewards in terms of efficiency, innovation, and growth are immense. The Long-Term Vision: Beyond Immediate Implementation

  1. Potential Impact on Revenue Growth:

Integrating AI Co-pair AideMates isn't just about streamlining processes—it's about driving tangible business results. By automating repetitive tasks, employees can focus on higher-value activities, leading to increased productivity. Moreover, the insights derived from AI analytics can guide our marketing and sales strategies, potentially leading to higher customer acquisition and retention rates.

  1. Enhancing Customer Satisfaction:

In today's digital age, customer expectations are constantly evolving. They demand swift responses, personalized experiences, and seamless interactions. AI Co-pair AideMates can assist in understanding customer behavior patterns, predicting their needs, and offering tailored solutions. This proactive approach can significantly enhance customer satisfaction, loyalty, and advocacy.

  1. Competitive Positioning in the Market:

The business landscape is saturated, with numerous players vying for market share. Integrating AI Co-pair AideMates can provide us with a competitive edge, differentiating us from rivals. By offering innovative solutions, streamlining operations, and delivering unparalleled customer experiences, we position ourselves not just as market participants but as industry leaders.

  1. Conclusion: A Future-Ready Organization:

Our journey with AI Co-pair AideMates is not a one-time implementation—it's a continuous process of learning, adapting, and evolving. By embracing this tool, we're not just investing in technology; we're investing in our future. A future where our organization is agile, customer-centric, and at the forefront of innovation. Next Steps and The Road Ahead

  1. Collaboration Opportunities:

The integration of AI Co-pair AideMates is not an isolated initiative. It offers numerous collaboration opportunities across departments—from IT and marketing to sales and customer support. By fostering cross-functional teams, we can ensure a holistic approach to implementation, leveraging the expertise and insights from various stakeholders.

  1. Timelines for Implementation:
  • Pilot Phase: The initial three months will focus on pilot testing in selected departments, gathering feedback, and making necessary adjustments.
  • Full-Scale Rollout: Post the pilot phase, we aim for a complete rollout across the organization within the next nine months.
  • Review and Refinement: After the first year of implementation, a comprehensive review will be conducted to assess the platform's impact and chart out strategies for further refinement.
  1. Metrics for Success:

Success isn't just about implementation—it's about measurable outcomes. Key metrics will include:

  • Operational Efficiency: Reduction in manual tasks and time saved.
  • Revenue Growth: Increase in sales and customer retention rates.
  • Customer Satisfaction: Enhanced Net Promoter Scores (NPS) and positive customer feedback.
  • Innovation Index: Number of new initiatives and projects spawned as a result of insights from AI Co-pair AideMates.
  1. Conclusion: A Commitment to Excellence:

As we embark on this journey with AI Co-pair AideMates, our focus remains unwavering: a commitment to excellence. Through collaboration, innovation, and continuous improvement, we aim to set new industry benchmarks and redefine the boundaries of what's possible. Charting a Path Forward with AI Co-pair AideMates

Dear Esteemed Board Members,

Navigating the modern business landscape requires both agility and foresight. We're at a juncture where traditional processes, while reliable, might not suffice in the face of rapid technological advancements. We've identified bureaucracy as one such challenge, potentially curbing our innovative spirit. Our goal is to transform these challenges into opportunities, leveraging AI Co-pair AideMates to spearhead this evolution.

A Strategic Blueprint for Innovation

Rather than perceiving this as a mere presentation, let's consider it a strategic blueprint—a tangible roadmap aiming for transformative results. We emphasize:

  • Harnessing Organizational Potential: We're an ecosystem of brilliant minds. By creating a nurturing environment, we can foster a culture where innovation thrives, ensuring our ideas don't remain confined to paper but see the light of day.

  • Utilizing AI Co-pair AideMate: Envision a relentless tool streamlining our processes, refining ideas, and optimizing strategies. This is what AI Co-pair AideMate promises. It symbolizes a continuous cycle of creation, improvement, and innovation, aiming to reduce our project timelines significantly.

  • Leveraging Internal Expertise: Our strength lies in our diverse talent pool. With AI Co-pair AideMate, every department stands to benefit from AI-driven expertise, ensuring consistent, optimal outcomes.

Reimagining Processes for Tomorrow

Adopting AI Co-pair AideMates isn't merely about integrating a new tool—it's about reimagining our entire workflow. This involves:

  • Continuous Innovation: With tools like Generative AI, we're not bound by conventional boundaries. We can foster a culture where innovation is the norm, ensuring we're always ahead of the curve.

  • Optimizing Processes: Regular introspection and refinement are crucial. By continually assessing our processes, we can ensure efficiency, eliminating redundancies and bolstering productivity.

  • Fostering Collaboration: AI Co-pair AideMates promises seamless interconnectivity. It ensures that while each unit is specialized, there's a collective understanding of our organizational goals, promoting collaboration and unified progress.

Conclusion: Embracing the Future Today

Incorporating AI Co-pair AideMates signifies more than technological adoption—it's a testament to our commitment to excellence and innovation. By taking this step, we're not just preparing for the future; we're shaping it. ]

Jeffrey Kilelo
  • 4,444
  • 1
  • 9
  • 6
0
It handles struct and array type both , whether you wants your array type as column or you want left it as it is.
def new_flatten(df, need_as_column):
stack, col_arr = [], []
stack.append((df.schema.fields, '', df))
while len(stack) > 0:
    arr, curr_name, curr_col = stack.pop()
    if curr_name != '':
        curr_name += '_'
    for x in arr:
        if str(x.dataType).startswith('StructType'):
            stack.append((x.dataType.fields, curr_name+x.name, curr_col[x.name]))
        elif str(x.dataType).startswith('ArrayType'):
            if need_as_column:
                elem_type = str(x.dataType.elementType)
                length = df.select(size(curr_col[x.name]).alias('my_size')).agg(max('my_size')).head()[0]
                for i in range(length):
                    if elem_type.startswith('StructType'):
                        stack.append((x.dataType.elementType.fields, curr_name + x.name + '_' + str(i), curr_col[x.name][i]))
                    else:
                        col_arr.append(curr_col[x.name].getItem(i).alias((curr_name + x.name + '_' + str(i)).lower())) 
            else:
                col_arr.append(curr_col[x.name].alias(curr_name+x.name))
        else:
            col_arr.append(curr_col[x.name].alias(curr_name+x.name))
return df.select(col_arr)