0

If I do try to use withColumn using the output of a previous withColumn it takes hours to complete:

df_spark.filter(some_criteria).withColumn(
    'Field2', MyCustomUDF('Field1')).withColumn(
    'Field3', MyCustomUDF2('Field2')).write.parquet('Parq.parquet')

However, if I do it in separate steps, it only takes minutes.

#Step 1
df_spark.filter(some_criteria).withColumn(
    'Field2',MyCustomUDF('Field1')).write.parquet('TmpFile.parquet')

#Step 2
df_spark2 = spark.read.parquet('TmpFile.parquet')
df_spark2.withColumn(
    'Field3',MyCustomUDF2('Field2')).write.parquet('Parq.parquet')
  1. Is this indeed expected behavior?
  2. Can I do this more efficiently without writing out a temp file? For instance, can you return two fields from UDF?
pault
  • 41,343
  • 15
  • 107
  • 149
jwillis0720
  • 4,329
  • 8
  • 41
  • 74
  • 1
    It may be useful to include the execution plan for both methods. See [this answer](https://stackoverflow.com/a/48428198/5858851) for more details. – pault Feb 01 '18 at 20:57
  • 1
    It is not expected in general for sure however, there many possible explanations. One more transformation can push execution plan just enough to make it significantly more expensive (the cost is non linear). Or `MyCustomUDF` puts a lot of stress on the Python worker interpreter, and it is cheaper to let it die, than wait for some resource cleanup. But diagnosing an exact is not possible, just with this information I am afraid. And general remark - if you use a lot of Python UDFs, then it is a good hint, that you're using a wrong API. – zero323 Feb 01 '18 at 23:08

0 Answers0