The solutions provided by Freek Wiemkeijer and Rakesh Kumar are perfectly adequate, however, since I coded it up, I thought it was worth posting this generic solution as it doesn't require hard coding of the column names.
pivot_cols = ['TYPE','CODE']
keys = ['ID','TYPE','CODE']
before = sc.parallelize([(1,'A','X1'),
(2,'B','X2'),
(3,'B','X3'),
(1,'B','X3'),
(2,'C','X2'),
(3,'C','X2'),
(1,'C','X1'),
(1,'B','X1')]).toDF(['ID','TYPE','CODE'])
#Helper function to recursively join a list of dataframes
#Can be simplified if you only need two columns
def join_all(dfs,keys):
if len(dfs) > 1:
return dfs[0].join(join_all(dfs[1:],keys), on = keys, how = 'inner')
else:
return dfs[0]
dfs = []
combined = []
for pivot_col in pivot_cols:
pivotDF = before.groupBy(keys).pivot(pivot_col).count()
new_names = pivotDF.columns[:len(keys)] + ["e_{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]
df = pivotDF.toDF(*new_names).fillna(0)
combined.append(df)
join_all(combined,keys).show()
This gives as output:
+---+----+----+--------+--------+--------+---------+---------+---------+
| ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
+---+----+----+--------+--------+--------+---------+---------+---------+
| 1| A| X1| 1| 0| 0| 1| 0| 0|
| 2| C| X2| 0| 0| 1| 0| 1| 0|
| 3| B| X3| 0| 1| 0| 0| 0| 1|
| 2| B| X2| 0| 1| 0| 0| 1| 0|
| 3| C| X2| 0| 0| 1| 0| 1| 0|
| 1| B| X3| 0| 1| 0| 0| 0| 1|
| 1| B| X1| 0| 1| 0| 1| 0| 0|
| 1| C| X1| 0| 0| 1| 1| 0| 0|
+---+----+----+--------+--------+--------+---------+---------+---------+