2

Suppose we have a very large spark dataframe where we wish to create a lag column:

lag_df = df.withColumn('lag',func.lag(df['id'],1)
                       .over(Window.partitionBy().orderBy('id')))

+---+----+
| id| lag|
+---+----+
|  1|null|
|  2|   1|
|  3|   2|
|  4|   3|
|  5|   4|
   .    .
   .    .

I found that the above ends up running on a single executor. This is fine for small dataframes, but it is not scalable at all. We can't use the paritionBy, so is there a different way to improve the scalability of this task?

icarus
  • 281
  • 2
  • 11
  • 1
    You didn’t define a partition ! – eliasah May 28 '18 at 07:02
  • Partition the data based on the id. The new code will be like: ```lag_df = df.withColumn('lag',func.lag(df['id'],1) .over(Window.partitionBy('id').orderBy('id')))``` – Nikhil Baby May 28 '18 at 07:24
  • @NikhilBaby, this is the dilemma. If I partition by id, then the lag column will be filled with nulls. – icarus May 28 '18 at 07:33
  • @eliasah, How would I go about partitioning without introducing null values in the lag column? – icarus May 28 '18 at 07:35
  • 1
    your requirement suggests that you will have to execute the lag function in one executor because in distributed system how will you tell that the partition in one executor is the previous or later partition of another partition in another executor? thats possible when you partition data so that the last row of each partition is duplicated in the next partition. so you will have to write a custom partitioner for that – Ramesh Maharjan May 28 '18 at 08:21

0 Answers0