1

I have a spark dataframe df that looks like this:

+----+------+------+
|user| value|number|
+----+------+------+
| A  | 25   |    13|
| A  | 6    |    14|
| A  | 2    |    11|
| A  | 32   |    17|
| B  | 22   |    19|
| B  | 42   |    10|
| B  | 43   |    32|
| C  | 33   |    12|
| C  | 90   |    21|
| C  | 12   |    32|
| C  | 22   |    32|
| C  | 64   |    10|
| D  | 32   |    23|
| D  | 62   |    11|
| D  | 32   |    13|
| E  | 63   |    17|
+----+------+------+

I want to group the df per user and then iterate through each row in the user groups to parse to a couple of functions that I have defined like below:

   def first_function(df):
   ... # operation on df
      return df

   def second_function(df):
   ... # operation on df
      return df

   def third_function(df):
    ... # operation on df
   return df

Based on this answer I'm aware I can extract a list of unique users like so:

from pyspark.sql import functions as F

users = [user[0] for user in df.select("user").distinct().collect()]
users_list = [df.filter(F.col('user')==user) for user in users]

But it is unclear to me how I can us this user_list to iterate through my original df per user group so that I can feed them to my functions. What is the best way to do this?

Marioanzas
  • 1,663
  • 2
  • 10
  • 33
sampeterson
  • 459
  • 4
  • 16
  • Manual iteration tends to be suboptimal in Spark. What are you trying to achieve in the functions? – mck Apr 20 '21 at 08:17
  • [These](https://stackoverflow.com/questions/67058882/typeerror-groupeddata-object-is-not-iterable-in-pyspark-dataframe) are a couple of the functions. Instead of grouping the data inside the function, I would like to group them before feeding them to the functions. Something like `for user in users: df = first_function(df) df = second_function(df) df = third_function(df)` – sampeterson Apr 20 '21 at 08:25

1 Answers1

1

You can group the dataframe by user and then use applyInPandas:

df = ...

def functions(pandas_df):
    def first_function(pandas_df1):
        # operation on pandas_df1
        return pandas_df1
    
    def second_function(pandas_df2):
        # operation on pandas_df2
        return pandas_df2

    def third_function(pandas_df3):
        # operation on pandas_df3
        return pandas_df3

    result = first_function(pandas_df)
    result = second_function(result)
    result = third_function(result)
    return result

schema_of_returned_df_of_third_function = "user string, value long, number long"

df.groupBy("user").applyInPandas(functions, schema_of_returned_df_of_third_function).show()

functions will be called by Spark with a Pandas dataframe for each group of the original Spark dataframe. For the given testdata the function will be called 5 times, once per user. The parameter pandas_df will contain a Pandas dataframe with all rows for the respective user. A good way to explore this possibility is to add print(pandas_df) to functions.

Inside of functions, you can implement any logic that is required using normal Pandas code. It is possible to add or drop columns and also to alter the number of rows of the Pandas dataframe. schema_of_returned_df_of_third_function should contain the structure of the returned Pandas dataframe of functions.

A downside of this approach is that each group of users has to fit completey into the memory of one of the Spark executors to prevent an OutOfMemory error.

werner
  • 13,518
  • 6
  • 30
  • 45