0

I have made a little search. this answer tells me that I can use UDF on GroupedData, it works and I can handle those rows and columns in GroupData with my own function.

According to official tutorial. They use groupBy() and window() operations to express windowed aggregations like below.

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

My questions is that whether there is a way to use UDF on words.groupBy( window(words.timestamp, "10 minutes", "5 minutes"). May be code like below? I have tried but it not work.

schema = StructType(
    [StructField("key", StringType()), StructField("avg_min", DoubleType())]
)

@panda_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).apply(g)
singed
  • 3
  • 1

1 Answers1

0

In Spark 3 you can use the applyInPandas instead, without explicit @pandas_udf (see documentation):

def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).applyInPandas(g, schema=schema)

In this case you'll get Pandas DataFrame and return back the Pandas DataFrame.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Hi, Alex.Thanks for your reply. But does this still work for **words.groupBy(window(xx), xx).applyInPandas()**. – singed Mar 15 '21 at 02:16
  • YES it works. I an in Spark version2.4.7 before. But it seem that no window.startTime and window.endTime in df.schema in func g, only timestamp and word. – singed Mar 15 '21 at 07:56