I have a dataframe that I am working with in a Python based Jupyter notebook. I want to add an additional column based on the content of an existing column, where the content of the new column is derived from running an external API call on the original column.
The solution I attempted was to use a Python based UDF. The first cell contains something like this:
def analysis(old_column):
new_column = myapi.analyze(text=old_column)
return(new_column)
analysis_udf = udf(analysis)
and the second cell this:
df2 = df1.withColumn("col2",analysis_udf('col1'))
df2.select('col2').show(n=5)
My dataframe is relatively large, with some 70000 rows, and where col1 can have a 100 to 10000+ characters of text. When I ran the code above in cell 2, it actually seemed to run fairly quickly (minutes), and dumped out the 5 rows of the df2 dataframe. So I thought I was in business. However, my next cell had the following code:
df2.cache()
df2.filter(col('col2').isNull()).count()
The intent of this code is to cache the contents of the new dataframe to improve access time to the DF, and then count how many of the entries in the dataframe have null values generated by the UDF. This surprisingly (to me) took many hours to run, and eventually provided an output of 6. Its not clear to me why the second cell ran quickly and the third was slow. I would have though that the df2.select('col2').show(n=5) call would have caused the UDF to run on all of the rows, and that one would have been slow, and then subsequent calls to access the new column of the dataframe would be quick. But that wasn't the case, so I supposed then that the cache call was the one that was actually causing the UDF to run on all of the rows so any subsequent calls now should be quick. So added another cell with:
df2.show(n=5)
Assuming it would run quickly, but again, it was taking much longer than I expected and it seems like perhaps the UDF was running again. (?)
My questions are
- Which Spark api calls actually cause the udf to run (or re-run), and how to structure the calls to run the UDF only once so that the new column is created with the text output by the UDF's python function.
- I have read that Python UDFs should be avoided because they are slow (seems correct) so what alternatives do I have when I need to use an API call to generate the new column?