1

Could you help me? I need from this JSONL data:

{"id": 1, "data": {"key:1": {"string_value": "value_1"}, "key:2": {"string_value": "value_2"}, "user_id": {"string_value": "value_4"}}}
{"id": 2, "data": {"key:3": {"string_value": "value_3"}, "user_id": {"string_value": "value_5"}}}

get this data. Output should look like this: enter image description here

Right now I'm using this solution:

output = df.select("id", "data.*")
output.show(200, False)
output.printSchema()

Im getting this data:

+---+---------+---------+---------+---------+
|id |key:1    |key:2    |key:3    |user_id  |
+---+---------+---------+---------+---------+
|1  |{value_1}|{value_2}|null     |{value_4}|
|2  |null     |null     |{value_3}|{value_5}|
+---+---------+---------+---------+---------+

root
 |-- id: long (nullable = true)
 |-- key:1: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
 |-- key:2: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
 |-- key:3: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
 |-- user_id: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
zigi
  • 21
  • 2

1 Answers1

0

What your are looking for is the explode function. This is how you can use it:

First of all you need a MapType or ArrayType in order to apply the explode function. By default, when you read a jsonl with spark, it produces a StructType, as you can see with printSchema().

>>> df = spark.read.json('data.json')                  
>>> df.printSchema()
root
 |-- data: struct (nullable = true)
 |    |-- key:1: struct (nullable = true)
 |    |    |-- string_value: string (nullable = true)
 |    |-- key:2: struct (nullable = true)
 |    |    |-- string_value: string (nullable = true)
 |    |-- key:3: struct (nullable = true)
 |    |    |-- string_value: string (nullable = true)
 |    |-- user_id: struct (nullable = true)
 |    |    |-- string_value: string (nullable = true)
 |-- id: long (nullable = true)

Applying an schema with MapType this is the output:

>>> schema = StructType() \
...     .add("data",
...         MapType(StringType(), StructType().add("string_value", StringType())), False) \
...     .add("id", StringType(), False)
>>> df = spark.read.json('data.json', schema=schema)                  
>>> df.printSchema()
root
 |-- data: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- string_value: string (nullable = true)
 |-- id: string (nullable = true)

Now that we have a map we can use explode:

>>> from pyspark.sql import functions as F
>>>
>>> df.withColumn('user_id', F.expr('data.user_id.string_value')) \
...     .select('id', F.explode('data'), 'user_id').show()
+---+-------+---------+---------+
| id|    key|    value|  user_id|
+---+-------+---------+---------+
|  1|  key:1|{value_1}| value_4 |
|  1|  key:2|{value_2}| value_4 |
|  1|user_id|{value_4}| value_4 |
|  2|  key:3|{value_3}| value_5 |
|  2|user_id|{value_5}| value_5 |
+---+-------+---------+---------+

When you use explode it produces a row per map key. That's the reason there is a row for each user_id key. You can use filter to get rid of them:

>>> df.withColumn('user_id', F.expr('data.user_id.string_value')) \
...     .select('id', F.explode('data'), 'user_id') \
...     .withColumn('value', F.expr('value.string_value')) \
...     .filter('key != "user_id"').show()
+---+-----+---------+---------+
| id|  key|    value|  user_id|
+---+-----+---------+---------+
|  1|key:1| value_1 | value_4 |
|  1|key:2| value_2 | value_4 |
|  2|key:3| value_3 | value_5 |
+---+-----+---------+---------+

I hope I could help you. Bye!

cruzlorite
  • 359
  • 1
  • 12
  • Hi, that works, but in my case i need to change column type (to MapType()) in existing DF. maybe you know how to do in? Not in load(schema=schema) function? Thank you in advance. – zigi Jun 07 '23 at 13:49
  • Hey, sure! You can use create_map. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.create_map.html – cruzlorite Jun 08 '23 at 14:06