1

How does one set the default value for pyspark.sql.functions.lag to a value within the current row?

For example, given:

testInput = [(1, 'a'),(2, 'c'),(3, 'e'),(1, 'a'),(1, 'b'),(1, 'b')]
columns = ['Col A', 'Col B']
df = sc.parallelize(testInput).toDF(columns)
df.show()

windowSpecification = Window.partitionBy(col('Col A')).orderBy(col('Col B'))
changedRows = col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)

df.select(col('Col A'), col('Col B'), changedRows.alias('New Col C')).show()

which outputs:

+-----+-----+
|Col A|Col B|
+-----+-----+
|    1|    a|
|    2|    c|
|    3|    e|
|    1|    a|
|    1|    b|
|    1|    b|
+-----+-----+

+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|     null|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|     null|
|    2|    c|     null|
+-----+-----+---------+

I would like the output to look like:

+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|    false|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|    false|
|    2|    c|    false|
+-----+-----+---------+

My current workaround is to add a second lag call to the changedRows, like so:

changedRows = (col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)) & F.lag(col('Col B'), 1).over(windowSpecification).isNotNull()

but this does not look clean to me.

I would like to do something like

changedRows = col('Col B') != F.lag(col('Col B'), 1, col('Col B')).over(windowSpecification)

but I get the error TypeError: 'Column' object is not callable.

pault
  • 41,343
  • 15
  • 107
  • 149
David
  • 167
  • 3
  • 11

2 Answers2

2

You can use column values as parameters if you use pyspark.sql.functions.expr. In your case, make the following modification to changedRows:

changedRows = F.expr(
    "`Col B` != lag(`Col B`, 1, `Col B`) over (PARTITION BY `Col A` ORDER BY `Col B`)"
)
df.select('Col A', 'Col B', changedRows.alias('New Col C')).show()
#+-----+-----+---------+
#|Col A|Col B|New Col C|
#+-----+-----+---------+
#|    1|    a|    false|
#|    1|    a|    false|
#|    1|    b|     true|
#|    1|    b|    false|
#|    3|    e|    false|
#|    2|    c|    false|
#+-----+-----+---------+

You have to refer to the column names in back ticks because of the space.

pault
  • 41,343
  • 15
  • 107
  • 149
  • 1
    is this not applicable for `default` in `pyspark.sql.functions.lag`, I tried using and it gave me `TypeError: Column is not iterable` – Ankit Agrawal Sep 29 '20 at 07:55
0

Here's an alternative approach without expr, but you have to check row_number() to see if you are at the first row in the partition.

changedRows = (
    (row_number().over(windowSpecification) != 1) &
    (col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification))
)
df.select(col('Col A'), col('Col B'), changedRows.alias('New Col C')).show()
#+-----+-----+---------+
#|Col A|Col B|New Col C|
#+-----+-----+---------+
#|    1|    a|    false|
#|    1|    a|    false|
#|    1|    b|     true|
#|    1|    b|    false|
#|    2|    c|    false|
#|    3|    e|    false|
#+-----+-----+---------+
user5233494
  • 71
  • 1
  • 6