0

I have a data frame as below

root
      |-- pid: string (nullable = true)
      |-- grouping: array (nullable = true)
      |    |-- element: struct (containsNull = true)
      |    |    |-- id: string (nullable = true)
      |    |    |-- definition: struct (nullable = true)
      |    |    |    |-- type: string (nullable = true)
      |    |    |    |-- name: struct string (nullable = true)
      |    |    |    |-- description: string (nullable = true)
      

which looks as below,

     pid  grouping
     1    [[id1,[def_type1,name1,desc1]],[id2[def_type2,name2,desc2]]]
     2    [[id3,[def_type3,name3,desc3]],[id4[def_type4,name4,desc4]]]
     {
     pid:1
     grouping[
         {
            id:id1,
            definition{
            type:def_type1,
            name: name1,
            description: desc1
         },
         {
            id:id2,
            definition{
            type:def_type2,
            name: name2,
            description: desc2
         }
      ]
     }
     {
     pid:2
     grouping[
         {
            id:id3,
            definition{
            type:def_type3,
            name: name3,
            description: desc3
         },
         {
            id:id3,
            definition{
            type:def_type3,
            name: name3,
            description: desc3
         }
       ]
     }

expected output:

     root
      |-- pid: string (nullable = true)
      |-- pos: integer (nullable = false)
      |-- name: string (nullable = true)
      |-- deftype: string (nullable = true)
      |-- id: string (nullable = true)
      |-- desc: string (nullable = true)

      pid  pos  name   deftype     id    desc
      ----------------------------------------
      1    0    name1  def_type1   id1   desc1
      1    1    name2  def_type2   id2   desc2
      2    0    name3  def_type3   id3   desc3
      2    1    name4  def_type4   id4   desc4

is it possible to explode each array item of all the elements against the pid as above ? pid pos name deftype id desc

1 0 name1 def_type1 id1 desc1 1 1 name2 def_type2 id2 desc2 2 0 name3 def_type3 id3 desc3 2 1 name4 def_type4 id4 desc4

I've used below way to get the output table but is there any other way tp do it ?

enter code here
from pyspark.sql.types import StructType,StructField, StringType, ArrayType
     from pyspark.sql.functions import split, explode, posexplode
     from pyspark.sql import functions as sf
     df1= Df.select(sf.col('_id'),(sf.col('grouping')))
     df2= df1.select('pid',posexplode(sf.col('grouping.definition.name').alias('name')))
     df2= df2.withColumnRenamed("col","name")
     df3= df1.select(sf.col('pid').alias('pid3'),posexplode(sf.col('grouping.definition.type').alias('deftype')))
     df3= df3.withColumnRenamed("col","deftype")
     df4= df1.select(sf.col('pid').alias('pid4'),posexplode(sf.col('grouping.id').alias('id')))
     df4= df4.withColumnRenamed("col","id")
     df6= df1.select(sf.col('pid').alias('pid5'),posexplode(sf.col('grouping.definition.description').alias('desengb')))
     df6= df6.withColumnRenamed("col","desc")
     df5= df2.join(df3,(df2["pos"]==df3["pos"]) & (df2["pid"]==df3["pid3"]),'inner').join(df4,(df2["pos"] == df4["pos"]) & (df2["pid"]==df4["pid4"]),'inner').join(df6,(df2["pos"] == df6["pos"]) & (df2["pid"]==df6["pid5"]),'inner').select(df2["*"],df3["deftype"],df4["id"],df6["desc"])
     #df2.show(15,False)
     df5.printSchema()

  root
  |-- pid: string (nullable = true)
  |-- pos: integer (nullable = false)
  |-- name: string (nullable = true)
  |-- deftype: string (nullable = true)
  |-- id: string (nullable = true)
  |-- desc: string (nullable = true)

1 Answers1

0

I suspect that you can use the solution from my answer here: py4j.protocol.Py4JJavaError: An error occurred while calling o133.pyWriteDynamicFrame
Basically, it appears you want to flatten out the nested objects, to do this, create a function like the following:

    def flatten(schema, prefix=None):
        """Flattens out nested schema
        NOTE: If different nested schemas have same named columns,the last one found will overwrite any earlier instances of that column"""
        fields = []
        for field in schema.fields:
            name = f"{prefix}.{field.name}" if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
            if isinstance(dtype, StructType):
                fields += flatten(dtype, prefix=name)
            else:
                fields.append(name)
        return fields

and then invoke it like:

in your imports:
from pyspark.context import SparkContext
from awsglue.context import GlueContext

#in your process:
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
flattened_frame = your_frame.select(flatten(your_frame.schema))

#if needed you can keep just the columns you want like:
flattened_frame = flattened_frame.select("columnNameToKeep","columnName2ToKeep")#put the name of each column you want to keep in here

#if needed you can rename all the columns like this:
flattened_frame = flattend_frame.toDF("newColName1","newColName2")# Important: put a name for each column in here.
jonlegend
  • 361
  • 2
  • 6
  • I guess this won't apply for my case, my schema is already flattened, even if use the function flatten with my schema as above, I am getting the same schema in flattened_frame as my schema – sreedhar sree Jun 18 '21 at 18:44