I'm having some issue with the spark I'm using
I'm trying to read some parquet file apply a filter and then run some python udf.
spark.read.parquet(...).where(
col('col_name_a').isNotNull()
).withColumn(
costly_python_udf("col_name_b").alias("col_b_transform")
)
With just those two steps, the code runs fast (in the sense that I'm action on a partial subset of data given the col_a!=null condition) as I expected. However if I take the dataframe and add more operations to it, processing slows down. It almost feels like the python udf operation and the null check operations got swapped.
I looked at the SQL section in the ApplicationMaster and I see in 'Optimized Logical Plan' that Filter step and BatchEvalPython got swapped.
In the same project, this swap impacted me where I was checking for null string and the loading json and the json load was failing due to the null load error, because guess again, the sequence of filter application and python udf was swapped. Few questions.
I'm currently managing this issue, by inserting cache() step after filter and before the python udf, which seems to be forcing Filter and BatchEvalPython run in the right sequence.
- Why does the Filter gets pushed after the BatchEvalPython. I can't think of any scenario where that would be an optimization. If anything, it should be the other way around.
- Is this is known issue? Is this fixed in future versions?
- Is there a better command than cache to force pyspark to not optimize across before and after the cache call?
spark.version = u'2.1.0'