2

Problem definition

I'm writing a Python application which slides a window over a sequence of values each with a timestamp. I want to apply a function to values in the sliding window in order to calculate a score from N latest values as shown in the figure. We already implemented that function using a Python library to make use of GPUs.

I found that Apache Spark 2.0 ships with Structured Streaming and it supports window operations on event time. If you want to read a finite sequence of records from a .csv file and want to count the records in such a sliding window, you can use the following code in PySpark:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'}) 

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

However, I want to apply UDAFs other than predefined aggregation functions over sliding windows. According to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg, the available aggregate functions are only avg, max, min, sum, and count.

It is not supported yet? If so, when will it be supported in PySpark?

https://stackoverflow.com/a/32750733/1564381 shows that one can define UserDefinedAggregateFunction in Java or Scala and then invoke it in PySpark. It seems interesting but I want to apply my own Python function over values in sliding windows. I want a purely Pythonic way.

p.s. let me know any frameworks in Python other than PySpark that can solve this sort of problems (applying UDAFs on a window sliding over stream).

zero323
  • 322,348
  • 103
  • 959
  • 935
eastcirclek
  • 107
  • 10
  • 1
    a) List of aggregation functions is way longer than a few you enumerated. See `pyspark.sql.functions`. b) It is not supported c) [Won't be supported any time soon](https://issues.apache.org/jira/browse/SPARK-10915). d) It is always better to discuss an actual function or some family of functions. – zero323 Mar 12 '17 at 13:28
  • @zero323 thanks a lot for clarification :-) – eastcirclek Mar 12 '17 at 23:17
  • One way to make the above example practical is to add a watermark (to allow late arrival) and to use append mode instead of complete mode (to keep memory usage tight). – eastcirclek Mar 12 '17 at 23:23
  • OK... On top of the above you have to remember that UDAFs operate with `merge` and `combine` and you effectively process one item at the time. Unlikely something you can reasonably accelerate on GPU (at least with my limited knowledge). You could probably use repartition and sort within partitions but not with Python and structured streaming AFAIK. – zero323 Mar 13 '17 at 14:33
  • @zero323 Though I illustrated each input record with a single column, I actually have to process more than hundreds of columns with a quite large sliding window, which is why I try to use a Python-based deep learning library :-) Anyway, thank you for your input. – eastcirclek Mar 13 '17 at 23:39
  • 1
    I decided to use the following combinations: 1) Apache Flink Stream API to read stream data and to apply UDAFs to values inside a sliding window 2) Jep to execute my Python program written using Keras inside JVM. I use Jep because Flink does not support Python APIs. Nevertheless Flink allows me to write an aggregation function over a sliding window without any constraint. – eastcirclek Mar 16 '17 at 00:23

1 Answers1

1

In Spark <2.3, you cannot do this.

For Spark >= 2.3, this is possible for Grouped data, but not yet for Windows using "PySpark UDAFs with Pandas".

Currently, PySpark cannot run UserDefined functions on Windows.

Here is a well described SO question on this: Applying UDFs on GroupedData in PySpark (with functioning python example)

Here is the JIRA ticket that added this feature - https://issues.apache.org/jira/browse/SPARK-10915

AbdealiLoKo
  • 3,261
  • 2
  • 20
  • 36
user554481
  • 1,875
  • 4
  • 26
  • 47