You can create a column with the row numbers (i.e. 1, 2, 3 for each record). Use this column as a pivot column with 2 aggregations - one for trouble_code
and one for trouble_status
.
pivot_data_sdf = data_sdf. \
withColumn('rn',
func.row_number().over(wd.partitionBy('job').orderBy(func.lit(1)))
). \
groupBy('job'). \
pivot('rn'). \
agg(func.first('trouble_code').alias('trouble_code'),
func.first('trouble_status').alias('trouble_status')
)
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# | job|1_trouble_code|1_trouble_status|2_trouble_code|2_trouble_status|3_trouble_code|3_trouble_status|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# |yyyy| aa| close| bb| open| cc| open|
# |xxxx| aa| open| bb| open| cc| close|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
Just rename the trouble_*
columns.
# function takes column name and renames it with the number at the end
def col_rename(sdfcolname):
colsplit = sdfcolname.split('_')
rearr_colsplit = colsplit[1:3] + [colsplit[0]]
new_sdfcolname = '_'.join(rearr_colsplit)
return new_sdfcolname
pivot_data_sdf. \
select(*[func.col(k).alias(col_rename(k)) if 'trouble'in k else k for k in pivot_data_sdf.columns]). \
show()
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# | job|trouble_code_1|trouble_status_1|trouble_code_2|trouble_status_2|trouble_code_3|trouble_status_3|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+
# |yyyy| aa| close| bb| open| cc| open|
# |xxxx| aa| open| bb| open| cc| close|
# +----+--------------+----------------+--------------+----------------+--------------+----------------+