I will try to replicate your dataframe.
df = spark.createDataFrame(
[
(1, '{"a": true, "b":98, "c":"xxx1"}', [['a','b']]),
(2, '{"a": false, "b":98, "c":"xxx2"}', [['c','a']]),
(3, '{"a": true, "b":99, "c":"xxx3"}', [['b','c']]),
],
schema=['id', 'salesRank', 'categories'],
)
df.show()
+---+--------------------+----------+
| id| salesRank|categories|
+---+--------------------+----------+
| 1|{"a": true, "b":9...| [[a, b]]|
| 2|{"a": false, "b":...| [[c, a]]|
| 3|{"a": true, "b":9...| [[b, c]]|
+---+--------------------+----------+
Lets parse your salesRank
column I will make every key as a column of the dataframe and lets also get first value from your categories
.
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('a', StringType(), True), # define the data type of values
StructField('b', DoubleType(), True), # of your json here, means `a`
StructField('c', StringType(), True) # holds stringType & so on for b,c
]
)
df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
.select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key')).show()
+---+--------------------+-----+----+----+----------+---+
| id| salesRank| a| b| c|categories|key|
+---+--------------------+-----+----+----+----------+---+
| 1|{"a": true, "b":9...| true|98.0|xxx1| [[a, b]]| a|
| 2|{"a": false, "b":...|false|98.0|xxx2| [[c, a]]| c|
| 3|{"a": true, "b":9...| true|99.0|xxx3| [[b, c]]| b|
+---+--------------------+-----+----+----+----------+---+
So I have converted your json keys to columns and extracted first element of your categories. Now we just need to add values for columns present in key
column.
To do this i referred to this SO answer.
from pyspark.sql.functions import col, lit, when
from functools import reduce
data_cols = ['a','b','c'] #all keys of your json dict
out = reduce(
lambda acc, x: when(col("key") == x, col(x)).otherwise(acc),
data_cols,
lit(None)
)
df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
.select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key'))\
.withColumn('value',out).show()
And the final output looks like this, all the values of the keys of your json you want in the column value
:
+---+--------------------+-----+----+----+----------+---+-----+
| id| salesRank| a| b| c|categories|key|value|
+---+--------------------+-----+----+----+----------+---+-----+
| 1|{"a": true, "b":9...| true|98.0|xxx1| [[a, b]]| a| true|
| 2|{"a": false, "b":...|false|98.0|xxx2| [[c, a]]| c| xxx2|
| 3|{"a": true, "b":9...| true|99.0|xxx3| [[b, c]]| b| 99.0|
+---+--------------------+-----+----+----+----------+---+-----+
EDIT
Turns out there is a way to make this schema independent. There is a function get_json_object
which lets solve this problem.
from pyspark.sql.functions import get_json_object
out2 = reduce(
lambda acc, x: when(col("key") == x, get_json_object(F.col('salesRank'), f"$.{x}").alias(f"{x}")).otherwise(acc),
data_cols,
lit(None)
)
df.select(F.col('id'), F.col('salesRank'), F.col('categories'),
(F.col('categories').getItem(0).getItem(0)).alias('key'))\
.withColumn('value',out2).show()
+---+--------------------+----------+---+-----+
| id| salesRank|categories|key|value|
+---+--------------------+----------+---+-----+
| 1|{"a": true, "b":9...| [[a, b]]| a| true|
| 2|{"a": false, "b":...| [[c, a]]| c| xxx2|
| 3|{"a": true, "b":9...| [[b, c]]| b| 99|
+---+--------------------+----------+---+-----+