14

Context

In a recent SO-post, I discovered that using withColumn may improve the DAG when dealing with stacked/chain column expressions in conjunction with distinct windows specifications. However, in this example, withColumn actually makes the DAG worse and differs to the outcome of using select instead.

Reproducible example

First, some test data (PySpark 2.4.4 standalone):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    }
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
|   1|   3|   3|   2|   4|
|   0|   0|   3|   3|   2|
|   3|   0|   1|   4|   4|
|   4|   0|   3|   3|   3|
+----+----+----+----+----+
only showing top 5 rows

The example is simple. In contains 2 window specifications and 4 independent column expressions based on them:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")

expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]

withColumn - 4 shuffles

If withColumn is used with alternating window specs, the DAG creates unnecessary shuffles:

df.withColumn("col_w1_1", col_w1_1)\
  .withColumn("col_w2_1", col_w2_1)\
  .withColumn("col_w1_2", col_w1_2)\
  .withColumn("col_w2_2", col_w2_2)\
  .explain()

== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
                  +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(col3#90L, 200)
                        +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
                           +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col1#88L, 200)
                                 +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

select - 2 shuffles

If all columns are passed with select, the DAG is correct.

df.select("*", *expr).explain()

== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

Question

There is some existing information about why one should avoid withColumn, however they are mainly concerned with calling withColumn a lot of times and they do not address the issue of deviating DAGs (see here and here). Does anyone have an idea why the DAG differs between withColumn and select? Spark's optimization algorithms should apply in any case and should not be dependent on different ways to express the exact same thing.

Thanks in advance.

pansen
  • 6,433
  • 4
  • 19
  • 32

3 Answers3

4

This looks like a consequence of the the internal projection caused by withColumn. It's documented here in the Spark docs

The official recommendation is to do as Jay recommended and instead do a select when dealing with multiple columns

Victor3y
  • 816
  • 6
  • 11
3

when using nested withColumns and window functions?

Let's say I want to do:

w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)

(df.withColumn("some1", col(f.max("original1").over(w1))
   .withColumn("some2", lag("some1")).over(w2)).show()

I got a lot of memory problems and high spill even with very small datasets. If I do the same using select instead of withColumn it performs way faster.

df.select(
    f.max(col("original1")).over(w1).alias("some1"),
    f.lag("some1")).over(w2)
).show()
abiratsis
  • 7,051
  • 3
  • 28
  • 46
Jay Kakadiya
  • 501
  • 1
  • 5
  • 12
  • Thanks for your answer! I would be interested in a minimal reproducible example which allows to observe the memory problems and high spill you described. – pansen Jan 22 '20 at 07:37
0

To expand on @Victor3y's answer:

If you're familiar with SQL and not the inner workings of Spark, the meaning of the documentation on withColumn might not be totally obvious:

this method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues...

One way to visualize this difference is in terms of SQL: think of withColumn like wrapping the original DF query in a subquery.

-- df.withColumn("foo", foo).withColumn("bar", bar)
-- 

WITH df AS (...query for df before .withColumn...),
WITH df_with_foo as (select df.*, foo from df)
WITH df_with_bar as (select df_with_foo.*, bar from df_with_foo)
SELECT df_with_bar.* from df_with_bar

So, it makes sense to use withColumn in a situation you would have used SQL subqueries with WITH clauses, but if you just want to add N independent columns, you're better off with select.

wrschneider
  • 17,913
  • 16
  • 96
  • 176