0

I am super new to PySpark and I am trying to get the difference between value within same id. I am using csv format for DataFrame.

For example, my dataset is like that:

+---+-----+
| id|value|
+---+-----+
|  1|   65|
|  1|   66|
|  1|   65|
|  2|   68|
|  2|   71|
+---+-----+

and I want something like this

+---+-----+----------+
| id|value|prev_value|
+---+-----+----------+
|  1|   65|      null|
|  1|   66|        65|
|  1|   65|        66|
|  2|   68|        65|
|  2|   71|        68|
+---+-----+----------+

so that it will be easy to calculate the difference between value.

cronoik
  • 15,434
  • 3
  • 40
  • 78
Myat Noe
  • 5
  • 2
  • 1
    That problem with your example is that dataframes aren't [ordered](https://issues.apache.org/jira/browse/SPARK-16207) and that the result can differ as long as you don't provide a column which allows ordering (e.g. row number). – cronoik Oct 09 '19 at 13:42
  • Agreed. There is no guarantee that the order will be preserved unless you have a monotonically increasing primary key. – samkart Oct 09 '19 at 14:55
  • Possible duplicate of [Applying a Window function to calculate differences in pySpark](https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark) – Brendan Frick Oct 09 '19 at 15:03

1 Answers1

0

Order is not guaranteed, but you can add row number using monotonically_increasing_id and a Window. If you can get row number in the data before it enters spark, then order will be guaranteed. Once you have row number, you can join on row number - 1.

WARNING: Depending on upstream partitions, monotonically_increasing_id won't guarantee preservation of file order

import pyspark.sql.functions as f
from pyspark.sql.window import Window


# Ideally you could add a partition to avoid performance hit, or have row numbers in the raw file
# e.g. Window.partitionBy(<some partion key>).orderBy(f.monotonically_increasing_id()) 
win_spec = Window.orderBy(f.monotonically_increasing_id())
df = df.withColumn("row_num", f.row_number().over(win_spec))

# Self join on row - 1
df_prev = df.select(col("row_num").alias("prev_row_num"), col("value").alias("prev_value"))
df_res = df.join(df_prev, (df.row_num - 1) == df_prev.prev_row_num, how="left")

df_res.select(["id", "value", "prev_value"]).show()

#  +---+-----+----------+
#  | id|value|prev_value|
#  +---+-----+----------+
#  |  1|   65|      null|
#  |  1|   66|        65|
#  |  1|   67|        66|
#  |  2|   68|        67|
#  |  2|   71|        68|
#  +---+-----+----------+
Brendan Frick
  • 1,047
  • 6
  • 19