This might do your job (or give you some ideas to proceed further)...
One idea is to convert your col4
to a primitive data type, i.e. a string:
from pyspark.sql.functions import collect_list
import pandas as pd
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd = sc.parallelize(a)
df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"])
df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']]
UPDATE (after your own answer):
I really thought the point I had reached above was enough to further adapt it according to your needs, plus that I didn't have time at the moment to do it myself; so, here it is (after modifying my df
definition to get rid of the parentheses, it is just a matter of a single list comprehension):
df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"])
# temp list:
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
ff
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']]
# final list of lists:
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]]
ll
which gives your initially requested result:
[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]] # requested output
This approach has certain advantages compared with the one provided in your own answer:
- It avoids Pyspark UDFs, which are known to be slow
- All the processing is done in the final (and hopefully much smaller) aggregated data, instead of adding and removing columns and performing map functions and UDFs in the initial (presumably much bigger) data