3

In PySpark, there's the concept of coalesce(colA, colB, ...) which will, per row, take the first non-null value it encounters from those columns. However, I want coalesce(rowA, rowB, ...) i.e. the ability to, per column, take the first non-null value it encounters from those rows. I want to coalesce all rows within a group or window of rows.

For example, given the following dataset, I want to coalesce rows per category and ordered ascending by date.

+---------+-----------+------+------+
| category|       date|  val1|  val2|
+---------+-----------+------+------+
|        A| 2020-05-01|  null|     1|
|        A| 2020-05-02|     2|  null|
|        A| 2020-05-03|     3|  null|
|        B| 2020-05-01|  null|  null|
|        B| 2020-05-02|     4|  null|
|        C| 2020-05-01|     5|     2|
|        C| 2020-05-02|  null|     3|
|        D| 2020-05-01|  null|     4|
+---------+-----------+------+------+

What I should get as the output is...

+---------+-----------+------+------+
| category|       date|  val1|  val2|
+---------+-----------+------+------+
|        A| 2020-05-01|     2|     1|
|        B| 2020-05-01|     4|  null|
|        C| 2020-05-01|     5|     2|
|        D| 2020-05-01|  null|     4|
+---------+-----------+------+------+
David Zhao
  • 241
  • 3
  • 7

1 Answers1

5

First, I'll give the answer. Then, I'll point out the important bits.

from pyspark.sql import Window
from pyspark.sql.functions import col, dense_rank, first

df = ...  # dataframe from question description

window = (
    Window
    .partitionBy("category")
    .orderBy(col("date").asc())
)
window_unbounded = (
    window
    .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)

cols_to_merge = [col for col in df.columns if col not in ["category", "date"]]
merged_cols = [first(col, True).over(window_unbounded).alias(col) for col in cols_to_merge]
df_merged = (
    df
    .select([col("category"), col("date")] + merged_cols)
    .withColumn("rank_col", dense_rank().over(window))
    .filter(col("rank_col") == 1)
    .drop("rank_col")
)

The row-wise analogue to coalesce is the aggregation function first. Specifically, we use first with ignorenulls = True so that we find the first non-null value.

When we use first, we have to be careful about the ordering of the rows it's applied to. Because groupBy doesn't allow us to maintain order within the groups, we use a Window.

The window itself must be unbounded on both ends rather than the default unbounded preceding to current row, else we'll end up with the first aggregation potentially running on subsets of our groups.

After we aggregate over the window, we alias the column back to its original name to keep the column names consistent.

We use a single select statement of cols rather than a for loop with df.withColumn(col, ...) because the select statement greatly reduces the query plan depth. Should you use the looped withColumn, you might hit a stack overflow error if you have too many columns.

Finally, we run a dense_rank over our window --- this time using the window with the default range --- and filter to only the first ranked rows. We use dense rank here, but we could use any ranking function, whatever fits our needs.

David Zhao
  • 241
  • 3
  • 7