Here is a dataframe:
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+
|id |actions |clicks|spend |
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+
|d353|[{"action_type":"key1","value":"55"}, {"action_type":"key2","value":"1"}, {"action_type":"key3","value":"56"}, {"action_type":"key4","value":"56"}, {"action_type":"key5","value":"16"}, {"action_type":"key8","value":"12"}, {"action_type":"key12","value":"8"}, {"action_type":"key10","value":"12"}, {"action_type":"key19","value":"12"}] |8 |835 |
|d353|[{"action_type":"key1","value":"50"}, {"action_type":"key2","value":"1"}, {"action_type":"key4","value":"51"}, {"action_type":"key3","value":"51"}, {"action_type":"key5","value":"2"}] |7 |582 |
|d353|[{"action_type":"key1","value":"38"}, {"action_type":"key3","value":"38"}, {"action_type":"key4","value":"38"}, {"action_type":"key5","value":"6"}, {"action_type":"key8","value":"5"}, {"action_type":"key12","value":"5"}, {"action_type":"key10","value":"5"}, {"action_type":"key19","value":"5"}] |6 |205 |
|56df|[{"action_type":"key1","value":"58"}, {"action_type":"key2","value":"2"}, {"action_type":"key3","value":"60"}, {"action_type":"key4","value":"60"}, {"action_type":"key5","value":"23"}, {"action_type":"key8","value":"11"}, {"action_type":"key11","value":"10"}, {"action_type":"key10","value":"11"}, {"action_type":"key19","value":"11"}] |15 |169 |
|56df|[{"action_type":"key1","value":"3"}, {"action_type":"key4","value":"3"}, {"action_type":"key3","value":"3"}, {"action_type":"key5","value":"2"}, {"action_type":"key8","value":"25"}, {"action_type":"key11","value":"1"}, {"action_type":"key10","value":"25"}, {"action_type":"key19","value":"25"}] |1 |139 |
|1f6f|[{"action_type":"key1","value":"37"}, {"action_type":"key4","value":"37"}, {"action_type":"key3","value":"37"}, {"action_type":"key5","value":"3"}, {"action_type":"key8","value":"1"}, {"action_type":"key10","value":"1"}, {"action_type":"key19","value":"1"}] |9 |939 |
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+
I would like to have all 3 columns aggregated by id.
The issue is that I don't see how to do that on the "actions" column which is a string. And also there is not an equal number of elements in that array. I want to have it aggregated by value, but to keep it at the end like a string since I will write that data frame to a database table
Here is a schema:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.read.parquet("actions.parquet")
df.printSchema()
root
|-- id: string (nullable = true)
|-- actions: string (nullable = true)
|-- clicks: integer (nullable = true)
|-- spend: integer (nullable = true)
Expected result:
|id |actions |clicks |spend |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-----------+
|d353|[{"action_type":"key1","value":"143"}, {"action_type":"key2","value":"40"}, {"action_type":"key3","value":"145"}, {"action_type":"key4","value":"145"}, {"action_type":"key5","value":"24"}, {"action_type":"key8","value":"23"}, {"action_type":"key12","value":"13"}, {"action_type":"key10","value":"17"}, {"action_type":"key19","value":"17"}] |21 |1622 |
|56df|[{"action_type":"key1","value":"61"}, {"action_type":"key2","value":"2"}, {"action_type":"key3","value":"63"}, {"action_type":"key4","value":"63"}, {"action_type":"key5","value":"25"}, {"action_type":"key8","value":"36"}, {"action_type":"key11","value":"12"}, {"action_type":"key10","value":"36"}, {"action_type":"key19","value":"36"}] |16 |308 |
|1f6f|[{"action_type":"key1","value":"37"}, {"action_type":"key3","value":"37"}, {"action_type":"key4","value":"37"}, {"action_type":"key5","value":"3"}, {"action_type":"key8","value":"1"}, {"action_type":"key10","value":"1"}, {"action_type":"key19","value":"1"}] |9 |939 |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-----------+
Could you please point me in the right direction?
Edit: I'm guessing that I would need to convert that string field to list of maps, do calculation, and convert back to string again.
Initially my plan was to split actions column to multiple and do sum. I tried something like this following some guide:
schema = ArrayType(
StructType(
[
StructField("action_type", StringType()),
StructField("value", StringType())
]
)
)
df = df.withColumn("actions", from_json(df.actions, schema))
So schema is now
root
|-- id: string (nullable = true)
|-- actions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- action_type: string (nullable = true)
| | |-- value: string (nullable = true)
|-- clicks: integer (nullable = true)
|-- spend: integer (nullable = true)
And I'm getting something like this
[{key1, 55}, {key2, 1}, {key3, 56}, {key4, 56} ...]
But then I got stuck. I don't what to do next, and how to get "Expected result" above