1

Hello I am trying to pivot a data table similar to the table below and put the trouble code values and trouble code status into columns and group by job #

Source Table

Source Table

Desired output Desired output

I've tried following the example below with the following code

How to pivot on multiple columns in Spark SQL?

 trouble_df = mydf.withColumn('combcol',F.concat(F.lit('trouble_code_'),mydf['trouble_code'])).groupby('Job #').pivot('combcol').agg(F.first('trouble_status'))

Below is the output from the code which isnt exactly what i'm looking. Fairly new to pyspark so still learning

enter image description here

Thank you for the help!

2 Answers2

0
df1 = (
  # Collapse columns into rows
  df.withColumn('tab', F.array(*[F.struct(F.lit(x).alias('y'), F.col(x).alias('z')) for x in df.columns if x!='job'])).selectExpr('*','inline(tab)').drop('tab')
    #Create new column names for pivot
  .withColumn('y', concat_ws('_',col('y'),dense_rank().over( Window.partitionBy('job').orderBy('job','trouble_code')).cast('string')))
  
  #Pivot
  .groupby('job').pivot('y').agg(F.first('z'))
  )

+---+-------------+-------------+-------------+----------------+----------------+----------------+
|job|trouble_code_1|trouble_code_2|trouble_code_3|trouble_status_1|trouble_status_2|trouble_status_3|
+---+-------------+-------------+-------------+----------------+----------------+----------------+
|xxx|           aa|           bb|           cc|            open|            open|          closed|
|yyy|           aa|           bb|           cc|          closed|            open|            open|
+---+-------------+-------------+-------------+----------------+----------------+----------------+
wwnde
  • 26,119
  • 6
  • 18
  • 32
0

You can create a column with the row numbers (i.e. 1, 2, 3 for each record). Use this column as a pivot column with 2 aggregations - one for trouble_code and one for trouble_status.

pivot_data_sdf = data_sdf. \
    withColumn('rn', 
               func.row_number().over(wd.partitionBy('job').orderBy(func.lit(1)))
               ). \
    groupBy('job'). \
    pivot('rn'). \
    agg(func.first('trouble_code').alias('trouble_code'), 
        func.first('trouble_status').alias('trouble_status')
        )

# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# | job|1_trouble_code|1_trouble_status|2_trouble_code|2_trouble_status|3_trouble_code|3_trouble_status|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# |yyyy|            aa|           close|            bb|            open|            cc|            open|
# |xxxx|            aa|            open|            bb|            open|            cc|           close|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+

Just rename the trouble_* columns.

# function takes column name and renames it with the number at the end
def col_rename(sdfcolname):
    colsplit = sdfcolname.split('_')
    rearr_colsplit = colsplit[1:3] + [colsplit[0]]
    new_sdfcolname = '_'.join(rearr_colsplit)
    return new_sdfcolname

pivot_data_sdf. \
    select(*[func.col(k).alias(col_rename(k)) if 'trouble'in k else k for k in pivot_data_sdf.columns]). \
    show()

# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# | job|trouble_code_1|trouble_status_1|trouble_code_2|trouble_status_2|trouble_code_3|trouble_status_3|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# |yyyy|            aa|           close|            bb|            open|            cc|            open|
# |xxxx|            aa|            open|            bb|            open|            cc|           close|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
samkart
  • 6,007
  • 2
  • 14
  • 29