Set-up:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = pd.DataFrame({
'rec': list('DEF'),
'a': ['a3', 'a3', None],
'b': [36, 79, None],
'c': [36, 79, 38],
'd': [36, 55, 38]
})
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(df)
Then melt the DataFrame accordingly, group by values and aggregate by keys:
cols_to_melt = list('abcd')
res = df.withColumn(
"tmp",
explode(array(
[struct(lit(c).alias('key'), col(c).alias('val'))
for c in cols_to_melt]))) \
.select('rec', col('tmp.key'), col('tmp.val')) \
.dropna() \
.groupby(['rec', 'val']) \
.agg(collect_list('key').alias('keys')) \
.groupby('rec') \
.agg(map_from_entries(collect_list(struct("val","keys"))).alias('maps'))
res.show(truncate=False)
Output:
+---+----------------------------------------------+
|rec|maps |
+---+----------------------------------------------+
|F |{38 -> [c, d], NaN -> [b]} |
|E |{79 -> [c], 79.0 -> [b], a3 -> [a], 55 -> [d]}|
|D |{36.0 -> [b], a3 -> [a], 36 -> [c, d]} |
+---+----------------------------------------------+
To get your report you just need to iterated through the collected data:
for row in res.collect():
print(row.rec)
print('\n'.join(f" {k} roles: {', '.join(v)}" for k, v in row.maps.items()))
Then your final report should look like:
F
38 roles: c, d
NaN roles: b
E
55 roles: d
79 roles: c
a3 roles: a
79.0 roles: b
D
36.0 roles: b
a3 roles: a
36 roles: c, d
One issue that I did not deal with here is that one of your columns contains both numeric and string values which is not possible in spark.
If you are converting a pandas DataFrame to a spark DataFrame (like I do in my example) you should pass an explicit schema.
If you are reading from CSV files you might not have to - the type will be automatically inferred as String
.
However, in that case, in order to group the columns where some have values like 38
and others "38"
you should make sure all relevant numeric columns are also converted to String
.
So, in any case it is better to use a schema to ensure you get exactly the types that you need in your DataFrame.