What your are looking for is the explode function. This is how you can use it:
First of all you need a MapType
or ArrayType
in order to apply the explode function. By default, when you read a jsonl with spark, it produces a StructType
, as you can see with printSchema()
.
>>> df = spark.read.json('data.json')
>>> df.printSchema()
root
|-- data: struct (nullable = true)
| |-- key:1: struct (nullable = true)
| | |-- string_value: string (nullable = true)
| |-- key:2: struct (nullable = true)
| | |-- string_value: string (nullable = true)
| |-- key:3: struct (nullable = true)
| | |-- string_value: string (nullable = true)
| |-- user_id: struct (nullable = true)
| | |-- string_value: string (nullable = true)
|-- id: long (nullable = true)
Applying an schema with MapType
this is the output:
>>> schema = StructType() \
... .add("data",
... MapType(StringType(), StructType().add("string_value", StringType())), False) \
... .add("id", StringType(), False)
>>> df = spark.read.json('data.json', schema=schema)
>>> df.printSchema()
root
|-- data: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- string_value: string (nullable = true)
|-- id: string (nullable = true)
Now that we have a map
we can use explode
:
>>> from pyspark.sql import functions as F
>>>
>>> df.withColumn('user_id', F.expr('data.user_id.string_value')) \
... .select('id', F.explode('data'), 'user_id').show()
+---+-------+---------+---------+
| id| key| value| user_id|
+---+-------+---------+---------+
| 1| key:1|{value_1}| value_4 |
| 1| key:2|{value_2}| value_4 |
| 1|user_id|{value_4}| value_4 |
| 2| key:3|{value_3}| value_5 |
| 2|user_id|{value_5}| value_5 |
+---+-------+---------+---------+
When you use explode
it produces a row per map key. That's the reason there is a row for each user_id
key. You can use filter
to get rid of them:
>>> df.withColumn('user_id', F.expr('data.user_id.string_value')) \
... .select('id', F.explode('data'), 'user_id') \
... .withColumn('value', F.expr('value.string_value')) \
... .filter('key != "user_id"').show()
+---+-----+---------+---------+
| id| key| value| user_id|
+---+-----+---------+---------+
| 1|key:1| value_1 | value_4 |
| 1|key:2| value_2 | value_4 |
| 2|key:3| value_3 | value_5 |
+---+-----+---------+---------+
I hope I could help you. Bye!