11

Context

Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.

Status Quo

Usually, I don't store intermediate results with df.withColumn but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).

Reproducible example

However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with df.withColumn reduces the DAG complexity. Let's consider following test setup:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   4|   1|
|   0|   2|   3|   0|
|   2|   0|   1|   0|
|   4|   1|   1|   2|
|   1|   3|   0|   4|
+----+----+----+----+
only showing top 5 rows

The computation is arbitrary. Basically we have 2 window specs and 3 computational steps. The 3 computational steps are dependend on each other and use alternating window specs:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3)

Inspecting the phyiscal plan via df_result.explain() reveals 4 exchanges and sorts! However, only 3 should be necessary here because we change the window spec only twice.

df_result.explain()
== Physical Plan ==
*(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L END AS result#22L]
+- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
   +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col3#2L, 200)
         +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
            +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST]
               +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col1#0L, 200)
                     +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
                        +- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC NULLS FIRST]
                           +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col3#2L, 200)
                                 +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
                                    +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                                       +- Exchange hashpartitioning(col1#0L, 200)
                                          +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]

Improvement

To get a better DAG, we slightly modify the code to store the column expression of step2 with withColumn and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed!

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# save temporary
df = df.withColumn("tmp_variable", step2)
step2 = F.col("tmp_variable")

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3).drop("tmp_variable")
df_result.explain()
== Physical Plan ==
*(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1) THEN _we0#42L END AS result#41L]
+- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST]
   +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col1#0L, 200)
         +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
            +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST]
               +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col3#2L, 200)
                     +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC NULLS FIRST]
                        +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(col1#0L, 200)
                              +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]

Relevance

My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)

Question

Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results).

mkrieger1
  • 19,194
  • 5
  • 54
  • 65
pansen
  • 6,433
  • 4
  • 19
  • 32
  • 1
    "I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively" - that's not true. `withColumn` is equivalent to subquery - in some contexts (non-analytical queries) it makes no difference. However analytical queries are a different thing. In many cases subquery is mandatory. Should Spark be able to optimize this? Probably... Why results are different? Because optimization of local expression and across logical nodes (here subqueries) are not the same. – 10465355 Feb 11 '20 at 12:12
  • @10465355saysReinstateMonica Thanks for your reply. Do you have a hint for further documentation/explanation on this topic? I found it hard to find proper and up-to-date resources on the web. – pansen Feb 11 '20 at 15:00
  • I don't think it is actually documented, excluding things like SQL standards (which Spark tries to follow). Your best bet is to check the source, and since it is internal component, it can change from version to version. – 10465355 Feb 11 '20 at 15:04

1 Answers1

0

If we look at the Analyzed Logical Plan (by=df_result.explain(True)) we can see that, while we don't have tmp_variable, but because of **lazy evaluation** of datasets/dataframes/tables on the way of creation Logical Plan, Analyzer perform analysis on that column assuming that column exist (lazy). And because of this assumption now it needs to build 2 axillary windows less than the previous case to achieve the same result. Actually also, by following the Parsed Logical Plan we see that analyzer needs to build less unevaluated windows (windowspecdefinition) when creating tmp_variable where instead of building windows on the it's push-down way, it mostly performs simple projects(selects).

Ehsan
  • 711
  • 2
  • 7
  • 21