0

I would like to know if it is possible to package a series of PySpark commands into a function such that such a function takes a dataframe and applies them to the dataframe. Something we do in Python.

For example, I have the following dataframe:

sevents_df.show(5)

+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime    |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
|    1.0|    5460|1503067077370|UC_001          |         NaN|  NaN|
|    1.0|     322|1503067090480|UC_008          |         NaN|  NaN|
|    1.0|     990|1503067099300|UC_001          |         NaN|  NaN|
|    1.0|    5040|1503067396060|UC_001          |         NaN|  NaN|
|    1.0|    6090|1503067402150|UC_001          |         NaN|  NaN|
+-------+--------+-------------+----------------+------------+-----+

Step 1. The first thing I do is to filter out the type. I simply keep UC_001.

sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)

Step 2. Drop some columns:

columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)

Step 3. Convert StartTime to Date

def convert_to_seconds(x):
    return x/1000

udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
                               f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))

I would like to put these three steps in a function like:

some udf pySpark_function(dataframe):
    step 1
    step 2
    step 3

The reason I want to do this is because if I have N dataframes I cannot imagine writing these steps N times.

One solution is to concatenate these N frames into one frame and pass this one giant frame through these steps once. Are there any alternatives to passing one frame at a time?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Rohit
  • 5,840
  • 13
  • 42
  • 65
  • 1
    You can also look at building a custom pipeline transformer - see [this post](https://stackoverflow.com/questions/49734374/pyspark-ml-pipelines-are-custom-transformers-necessary-for-basic-preprocessing). – pault Aug 10 '18 at 15:03

1 Answers1

2

An UDF is used to process values in dataframe columns and can't be used to process a whole dataframe. Instead, create a normal method that takes a dataframe and returns a processed dataframe.

def process_df(df):
    df = df.filter(df['TypeEnumeration'] == 'UC_001')

    columns_to_drop = ['Comments', 'Floor_Number', 'Value']
    df = df.drop(*columns_to_drop)

    df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))

    return df

Then simply loop over all the dataframes and use the above method.

Note: I made some simplifications to the code. There is no need for isin since you are only filtering with a single value and no UDF is necessary to divide by 1000. When possible it's preferable to use the inbuilt Spark functions instead of custom a UDF, it's faster.

Shaido
  • 27,497
  • 23
  • 70
  • 73