7

Are window functions(e.g. first, last, lag, lead) supported by pyspark?

For example, how can I group by one column and order by another one, then select the first row for each group (which is just like window function doing) by SparkSQL or data frame?

I find pyspark.sql.functions class contains aggregation function first and last, but they can not be used for groupBy class.

Fernando Carvalhosa
  • 1,098
  • 1
  • 15
  • 23
Jie Chen
  • 151
  • 1
  • 2
  • 4
  • I don't think they are directly supported, but you can implement them yourself; groupByKey gets you an array (well, an iterable) of all the 'rows'(objects) in a group – okaram Apr 02 '15 at 14:38
  • [Nexr has window functions](https://github.com/nexr/hive-udf) implemented as Hive UDFs (user defined functions) that should work in Spark SQL. You need to build Spark with Hive, change some configurations, and register the UDFs. – dnlbrky Apr 20 '15 at 20:34

2 Answers2

7

All of the above functions can be used along Window functions. A sample would look somewhat like this.

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, first, last

df.withColumn('value', lag('col1name').over(
    Window.partitionBy('colname2').orderBy('colname3')
    )
)

The function is used on the partition only when you use the partitionBy clause. If you just want to lag / lead over the entire data, use a simple orderBy and don't use the patitionBy clause. However, that wouldn't be very efficient.

In case you want the lag / lead to perform in a reverse fashion, you can also use the following format:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, first, last, desc

df.withColumn('value', lag('col1name').over(
    Window.partitionBy('colname2').orderBy(desc('colname3'))
    )
)

Although strictly speaking, you wouldn't need the desc for lag / lead type functions. They are primarily used in conjunction with rank / percent_rank / row_number type functions.

3

Since spark 1.4 you can use window functions. In pyspark this would look something like this:

from pyspark.sql.functions import rank
from pyspark.sql import Window
data = sqlContext.read.parquet("/some/data/set")
data_with_rank = data.withColumn("rank", rank().over(Window.partitionBy("col1").orderBy(data["col2"].desc())))
data_with_rank.filter(data_with_rank["rank"] == 1).show()
osiris42
  • 56
  • 3
  • 1
    Note that if you want to use the `df.sql` capability with window operations you need to be using a `HiveContext`, not a `SqlContext` – Tristan Reid Jan 05 '16 at 12:11