-1

I have a Spark query which reads a lot of parquet data from S3, filters it, and adds a column computed as regexp_extract(input_file_name, ...) which I assume is a relatively heavy operation (if applied before filtering rather than after it).

The whole query looks like this:

val df = spark
    .read
    .option("mergeSchema", "true")
    .parquet("s3://bucket/path/date=2020-01-1{5,6}/clientType=EXTENSION_CHROME/type={ACCEPT,IGNORE*}/")
    .where(...)
    .withColumn("type", regexp_extract(input_file_name, "type=([^/]+)", 1))
    .repartition(300)
    .cache()

df.count()

Is withColumn executed after where or before where? Does it depend on the order in which I write them? What if my where statement used a column added by withColumn?

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19
ptkvsk
  • 2,096
  • 1
  • 25
  • 47

1 Answers1

4

The withColumn and filter execute in the order they are called. The plan explains it. Please read the plan bottom up.

val employees = spark.createDataFrame(Seq(("E1",100.0), ("E2",200.0),("E3",300.0))).toDF("employee","salary")

employees.withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).filter(col("column1")==="poor").explain(true)

Plan - project happened 1st then filter.

== Parsed Logical Plan ==
'Filter ('column1 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Filter (column1#8 = poor)
+- Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#8]
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

Code 1st filters then adds new column

employees.filter(col("employee")==="E1").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).explain(true)

Plan - 1st filters then projects

== Parsed Logical Plan ==
'Project [employee#4, salary#5, CASE WHEN ('salary > 200) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
employee: string, salary: double, column1: string
Project [employee#4, salary#5, CASE WHEN (salary#5 > cast(200 as double)) THEN rich ELSE poor END AS column1#13]
+- Filter (employee#4 = E1)
   +- Project [_1#0 AS employee#4, _2#1 AS salary#5]
      +- LocalRelation [_1#0, _2#1]

Another evidence - it gives error when filter is called on a column before adding it (obviously)

employees.filter(col("column1")==="poor").withColumn("column1", when(col("salary") > 200, lit("rich")).otherwise("poor")).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`column1`' given input columns: [employee, salary];;
'Filter ('column1 = poor)
+- Project [_1#0 AS employee#4, _2#1 AS salary#5]
   +- LocalRelation [_1#0, _2#1]
Salim
  • 2,046
  • 12
  • 13