0

Here is a dataframe:

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+
|id  |actions                                                                                                                                                                                                                                                                                                                                                                                                                                                         |clicks|spend      |
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+
|d353|[{"action_type":"key1","value":"55"}, {"action_type":"key2","value":"1"}, {"action_type":"key3","value":"56"}, {"action_type":"key4","value":"56"}, {"action_type":"key5","value":"16"}, {"action_type":"key8","value":"12"}, {"action_type":"key12","value":"8"}, {"action_type":"key10","value":"12"}, {"action_type":"key19","value":"12"}]                                                                                                                  |8     |835        |
|d353|[{"action_type":"key1","value":"50"}, {"action_type":"key2","value":"1"}, {"action_type":"key4","value":"51"}, {"action_type":"key3","value":"51"}, {"action_type":"key5","value":"2"}]                                                                                                                                                                                                                                                                         |7     |582        |
|d353|[{"action_type":"key1","value":"38"}, {"action_type":"key3","value":"38"}, {"action_type":"key4","value":"38"}, {"action_type":"key5","value":"6"}, {"action_type":"key8","value":"5"}, {"action_type":"key12","value":"5"}, {"action_type":"key10","value":"5"}, {"action_type":"key19","value":"5"}]                                                                                                                                                          |6     |205        |
|56df|[{"action_type":"key1","value":"58"}, {"action_type":"key2","value":"2"}, {"action_type":"key3","value":"60"}, {"action_type":"key4","value":"60"}, {"action_type":"key5","value":"23"}, {"action_type":"key8","value":"11"}, {"action_type":"key11","value":"10"}, {"action_type":"key10","value":"11"}, {"action_type":"key19","value":"11"}]                                                                                                                 |15    |169        |
|56df|[{"action_type":"key1","value":"3"}, {"action_type":"key4","value":"3"}, {"action_type":"key3","value":"3"}, {"action_type":"key5","value":"2"}, {"action_type":"key8","value":"25"}, {"action_type":"key11","value":"1"}, {"action_type":"key10","value":"25"}, {"action_type":"key19","value":"25"}]                                                                                                                                                          |1     |139        |
|1f6f|[{"action_type":"key1","value":"37"}, {"action_type":"key4","value":"37"}, {"action_type":"key3","value":"37"}, {"action_type":"key5","value":"3"}, {"action_type":"key8","value":"1"}, {"action_type":"key10","value":"1"}, {"action_type":"key19","value":"1"}]                                                                                                                                                                                               |9     |939        |
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------+

I would like to have all 3 columns aggregated by id.

The issue is that I don't see how to do that on the "actions" column which is a string. And also there is not an equal number of elements in that array. I want to have it aggregated by value, but to keep it at the end like a string since I will write that data frame to a database table

Here is a schema:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

df = spark.read.parquet("actions.parquet")
df.printSchema()
root
 |-- id: string (nullable = true)
 |-- actions: string (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- spend: integer (nullable = true)

Expected result:

|id  |actions                                                                                                                                                                                                                                                                                                                                                 |clicks |spend      |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-----------+
|d353|[{"action_type":"key1","value":"143"}, {"action_type":"key2","value":"40"}, {"action_type":"key3","value":"145"}, {"action_type":"key4","value":"145"}, {"action_type":"key5","value":"24"}, {"action_type":"key8","value":"23"}, {"action_type":"key12","value":"13"}, {"action_type":"key10","value":"17"}, {"action_type":"key19","value":"17"}]     |21     |1622       |
|56df|[{"action_type":"key1","value":"61"}, {"action_type":"key2","value":"2"}, {"action_type":"key3","value":"63"}, {"action_type":"key4","value":"63"}, {"action_type":"key5","value":"25"}, {"action_type":"key8","value":"36"}, {"action_type":"key11","value":"12"}, {"action_type":"key10","value":"36"}, {"action_type":"key19","value":"36"}]         |16     |308        |
|1f6f|[{"action_type":"key1","value":"37"}, {"action_type":"key3","value":"37"}, {"action_type":"key4","value":"37"}, {"action_type":"key5","value":"3"}, {"action_type":"key8","value":"1"}, {"action_type":"key10","value":"1"}, {"action_type":"key19","value":"1"}]                                                                                       |9      |939        |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-----------+

Could you please point me in the right direction?

Edit: I'm guessing that I would need to convert that string field to list of maps, do calculation, and convert back to string again.

Initially my plan was to split actions column to multiple and do sum. I tried something like this following some guide:

schema = ArrayType(
    StructType(
        [
            StructField("action_type", StringType()),
            StructField("value", StringType())
        ]
    )
)

df = df.withColumn("actions", from_json(df.actions, schema))

So schema is now

root
 |-- id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- spend: integer (nullable = true)

And I'm getting something like this

[{key1, 55}, {key2, 1}, {key3, 56}, {key4, 56} ...] 

But then I got stuck. I don't what to do next, and how to get "Expected result" above

CoyoteKG
  • 45
  • 5
  • Please could you explain what you mean by "aggregated", for each column? For example, for id==d353, what aggregated value would you like to see in the 'actions' column? – TMBailey Jul 22 '21 at 15:01
  • Thank you for helping. I just added what result I expected – CoyoteKG Jul 23 '21 at 07:56

1 Answers1

0

The trick is to use groupby().agg() and provide a dictionary with a function for each named column that does the desired aggregation over a pandas Series. That function will be called once for each group that needs to be aggregated. For the numeric columns the aggregation function is just sum().

If the actions are a list (of dict or whatever) and you want to string them all together within each aggregated group, then itertools.chain.from_iterable() can do most of the work for the 'actions' (see Flattening a shallow list in Python). Here we want to convert the result of chaining to a list, so we can embed chain() inside a lambda expression that applies list().

import pandas as pd
from itertools import chain

# Some toy data.
df = pd.DataFrame(dict(actions=[['act1', 'act2'], ['act1', 'act3', 'act4'], ['act2', 'act4']],
                       clicks=[2,3,2],
                       spend=[800,650,743]),
                  index=[111,111,222])
df

#                 actions  clicks  spend
# 111        [act1, act2]       2    800
# 111  [act1, act3, act4]       3    650
# 222        [act2, act4]       2    743

# Group by index value, apply specified functions to groups within each named column.
#  See https://stackoverflow.com/questions/406121/flattening-a-shallow-list-in-python
df.groupby(level=0).agg(dict(actions=lambda x: list(chain.from_iterable(x)), clicks=sum, spend=sum))

#                             actions  clicks  spend
# 111  [act1, act2, act1, act3, act4]       5   1450
# 222                    [act2, act4]       2    743

If the actions are strings, then for the 'actions' column we might be tempted to use a lambda expression to call str.join() on each group of strings.

import pandas as pd

# Some toy data.
actions = ['[{"c":"d"}, {"a":"b"}]',
           '[{"c":"d"}, {"a":"b"}], {"e":"f"}',
           '[{"c":"d"}, {"a":"b"}]']
df = pd.DataFrame(dict(actions=actions,
                       clicks=[2,3,2],
                       spend=[800,650,743]),
                  index=[111,111,222])
df

#                                actions  clicks  spend
# 111             [{"c":"d"}, {"a":"b"}]       2    800
# 111  [{"c":"d"}, {"a":"b"}], {"e":"f"}       3    650
# 222             [{"c":"d"}, {"a":"b"}]       2    743

df.groupby(level=0).agg(dict(actions=lambda x: ''.join(x), clicks=sum, spend=sum))

#                                                actions  clicks  spend
# 111  [{"c":"d"}, {"a":"b"}][{"c":"d"}, {"a":"b"}], ...       5   1450
# 222                             [{"c":"d"}, {"a":"b"}]       2    743

But this is not quite right because the 'clicks' for group 111 looks like [...][...] but should look like [...]. To aggregate these properly we need to first eval() each cell to interpret it as a list, then use the chain() function from above to aggregate all the lists in a group, and finally use repr() to obtain a string representing the aggregated list.

def agg_actions(actions):
    assert isinstance(actions, pd.Series), f"Expected Series, got {type(actions)}"
    actions_as_list = actions.map(eval)  # Interpret string '[...]' as list [...].
    agg_as_list = list(chain.from_iterable(actions_as_list))  # Aggregate whole series into one list.
    return repr(agg_as_list)  # Return a string representation of the big list.

df.groupby(level=0).agg(dict(actions=agg_actions, clicks=sum, spend=sum))

#                                                actions  clicks  spend
# 111  [{'c': 'd'}, {'a': 'b'}, {'c': 'd'}, {'a': 'b'...       5   1450
# 222                           [{'c': 'd'}, {'a': 'b'}]       2    743
TMBailey
  • 557
  • 3
  • 14
  • That is not result which I expect. I'm guessing that I would need to convert that string field to list of maps, do calculation, and convert back to string again. – CoyoteKG Jul 23 '21 at 08:57
  • Good spot. I've amended the answer to do something along those lines using eval() and repr(). – TMBailey Jul 23 '21 at 09:24
  • thank you rly for your efforts, but seems like I was not clear enough with what I expect. Let's look on one element in action fields ``` |d353|[{"action_type":"key1","value":"55"}...]| |d353|[{"action_type":"key1","value":"50"}...]| |d353|[{"action_type":"key1","value":"38"}...]| ``` and this is sum of values of that one element ``` |d353|[{"action_type":"key1","value":"143"}...]| ``` – CoyoteKG Jul 23 '21 at 09:58
  • That's clearer now and I see that my naive application of pandas was probably inappropriate for your PySpark context. Hopefully somebody can jump in and provide some useful advice. – TMBailey Jul 23 '21 at 10:28