1

I want perform following operations on a dataframe:

  1. Groupby column
  2. Window data
  3. perform (udf) custom operation on the windowed data

Here is the sample code that I tried:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
ss = SparkSession.builder
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col

sparkSession = ss.getOrCreate()

sc = sparkSession.sparkContext
sc.setLogLevel("FATAL")

df = sparkSession.createDataFrame([(17.00, "2018-03-10"),
                                   (13.00, "2018-03-11"),
                                   (25.00, "2018-03-12"),
                                   (20.00, "2018-03-13"),
                                   (17.00, "2018-03-14"),
                                   (99.00, "2018-03-15"),
                                   (156.00, "2018-03-22"),
                                   (17.00, "2018-03-31"),
                                   (25.00, "2018-03-15"),
                                   (25.00, "2018-03-16")
                                   ],
                                  ["id", "ts"])

w = F.window(col("ts").cast("timestamp"), "10 days")
windo = w.alias("window")

@udf(ArrayType(FloatType()))
def new_tuple(x):
    #print(type(x))
    return x

df.groupBy("id", windo).agg(new_tuple(F.collect_list("id"))).show(truncate=False)

Above code gives me what I want. However, I am not sure about "collect_list" method.

I tried pandas UDF as well. I get my expected output (see below) using pandas. However, "apply" method does not return window column.

Questions:

  1. Does collect_list run on worker node or driver node? This code might not be scalable if the collect_list will collect all the results to master node.

  2. Is there any efficient way to get the following output without collect_list?

  3. I read pandas UDF are efficient. However, I don't know how to pass/return window column back.

Expected output:

+-----+------------------------------------------+---------------------------------+
|id   |window                                    |new_tuple(collect_list(id, 0, 0))|
+-----+------------------------------------------+---------------------------------+
|17.0 |[2018-03-29 19:00:00, 2018-04-08 19:00:00]|[17.0]                           |
|25.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[25.0, 25.0, 25.0]               |
|13.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[13.0]                           |
|99.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[99.0]                           |
|156.0|[2018-03-19 19:00:00, 2018-03-29 19:00:00]|[156.0]                          |
|20.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[20.0]                           |
|17.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[17.0, 17.0]                     |
+-----+------------------------------------------+---------------------------------+

Question here does not provide the answers to my questions. I am applying a windowing operation on the grouped data.

ciri
  • 95
  • 1
  • 1
  • 5
  • Possible duplicate of [Applying UDFs on GroupedData in PySpark (with functioning python example)](https://stackoverflow.com/questions/40006395/applying-udfs-on-groupeddata-in-pyspark-with-functioning-python-example) – 10465355 Feb 15 '19 at 18:25
  • @user10465355 I already had looked the linked your referred. This does not answer my questions neither provide a solution. For example, where would collect_list be executed and collect data to? I also mentioned in my question that UDAF can provide me a list without collect_list but it won't return window column as compared to normal UDFs – ciri Feb 15 '19 at 18:36

1 Answers1

1

To answer your 3rd question, you just need to explicitly create a column for storing the windows such as:

df = df.withColumn('window', F.window(col("ts").cast("timestamp"), "10 days"))
df.groupby('id', 'window').apply(pandas_udf)

Here the newly created window column will be a column of dictionaries with keys start, end denoting the start time and end time of the windows. You can further flatten this to two columns of start and end times by accessing the individual elements:

df = df.withColumn('start', F.col('window')['start'])
df = df.withColumn('end', F.col('window')['end'])

Then whatever is the state of the Spark dataframe prior to being applied to a Pandas UDF, will be the state of the Pandas data frame received by the UDF. Thus, you will have received the windows on the UDF side, and able to return their values after transforming.