0

I have a dataframe that I am working with in a Python based Jupyter notebook. I want to add an additional column based on the content of an existing column, where the content of the new column is derived from running an external API call on the original column.

The solution I attempted was to use a Python based UDF. The first cell contains something like this:

def analysis(old_column):

    new_column = myapi.analyze(text=old_column)
    return(new_column)

analysis_udf = udf(analysis)

and the second cell this:

df2 = df1.withColumn("col2",analysis_udf('col1'))
df2.select('col2').show(n=5)

My dataframe is relatively large, with some 70000 rows, and where col1 can have a 100 to 10000+ characters of text. When I ran the code above in cell 2, it actually seemed to run fairly quickly (minutes), and dumped out the 5 rows of the df2 dataframe. So I thought I was in business. However, my next cell had the following code:

df2.cache()
df2.filter(col('col2').isNull()).count()

The intent of this code is to cache the contents of the new dataframe to improve access time to the DF, and then count how many of the entries in the dataframe have null values generated by the UDF. This surprisingly (to me) took many hours to run, and eventually provided an output of 6. Its not clear to me why the second cell ran quickly and the third was slow. I would have though that the df2.select('col2').show(n=5) call would have caused the UDF to run on all of the rows, and that one would have been slow, and then subsequent calls to access the new column of the dataframe would be quick. But that wasn't the case, so I supposed then that the cache call was the one that was actually causing the UDF to run on all of the rows so any subsequent calls now should be quick. So added another cell with:

df2.show(n=5)

Assuming it would run quickly, but again, it was taking much longer than I expected and it seems like perhaps the UDF was running again. (?)

My questions are

  1. Which Spark api calls actually cause the udf to run (or re-run), and how to structure the calls to run the UDF only once so that the new column is created with the text output by the UDF's python function.
  2. I have read that Python UDFs should be avoided because they are slow (seems correct) so what alternatives do I have when I need to use an API call to generate the new column?
zero323
  • 322,348
  • 103
  • 959
  • 935
Chris Ratcliffe
  • 116
  • 1
  • 10
  • 1
    Somewhat related: [Pyspark: Best practice to add more columns to a DataFrame](https://stackoverflow.com/questions/49651049/pyspark-best-practice-to-add-more-columns-to-a-dataframe). – pault Apr 05 '18 at 15:11

1 Answers1

1

I would have though that the df2.select('col2').show(n=5) call would have caused the UDF to run on

It is not a correct assumption. Spark will evaluate as little data as possible, given limitations of the API. Because you use Python udf it will evaluate minimum number of partitions required to collect 5 rows.

Which spark api calls actually cause the udf to run (or re-run), and how to structure the calls to run the UDF only once so that the new column is created with the text output by the UDF's python function.

  • Any evaluation, if data is no longer cached (evicted from memory).
  • Possibly any usage of the resulting column, unless udf is marked as non-deterministic.

I have read that Python UDFs should be avoided because they are slow (seems correct) so what alternatives do I have when I need to use an API call to generate the new column?

Unless you want to switch to Scala or RDD API, the only alternative is pandas_udf, which is somewhat more efficient, but supports only a limited subset of types.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thanks. When you mentioned "Any evaluation, if data is no longer cached", can you clarify. This is why I explicitly included the df2.cache() call, but it didn't seem to prevent a subsequent access of the column from re-executing the UDF again. – Chris Ratcliffe Apr 05 '18 at 15:31
  • https://stackoverflow.com/q/42660385/8371915, but in your it is just never fully evaluated. – Alper t. Turker Apr 05 '18 at 15:33
  • I see. So when I ran the cache call, only 5 of the rows had actually been evaluated at that point, and so that was all that was cached. If instead I run `df2.filter(col('col2').isNull()).count()` I would assume that would force the evaluation of that column for every row, and if I followed that up with `df2.cache()`, would that result in subsequent use of the df2 column to skip the UDF call? – Chris Ratcliffe Apr 05 '18 at 15:45
  • No. Please read the first paragraph again. It will evaluate as many __partitions__ as required to collect 5 rows. So exact number depends on the data distribution. – Alper t. Turker Apr 05 '18 at 15:56
  • My apologies, I should have read that more carefully. But the key is that only a subset of the data (on the partitions containing those 5 rows) would have been evaluated to cause the UDF to execute, and the data on the remaining partitions would have still been pending and attempts to access those would have triggered the UDF. But the null check I mentioned previously would result in the access of all of the rows (and therefore all partitions) so at that point, using the cache should ensure the UDF would not need to be called again for subsequent access attempts of that columns data. Yes? – Chris Ratcliffe Apr 05 '18 at 16:41
  • _cache should ensure_ is too strong assumption. Data might not still be fully cached (although default cache is MEMORY_AND_DISK here), or access might require disk reads, and don't be faster at all. In general check execution plan and storage info to see what is going on in practice. – Alper t. Turker Apr 05 '18 at 17:06
  • Sorry, to clarify, speed is helpful, but the key is that I want to avoid the UDFs re-executing, because each execution has a monetary cost associated with the api call. So as long as I have a way to force the api based udf to run once for each row such that subsequent attempts to access that newly generated column won't result in the udf running again, I am fine with the subsequent accesses taking a bit of time. Thanks for your patience and detailed responses. – Chris Ratcliffe Apr 05 '18 at 17:18
  • `cache` and `UserDefinedFunction` marked as `asNondeterministic()` should be enough for majority of applications, but if you want to be sure, put independent cache between external API and Spark (IMDG, memcache or caching proxy). – Alper t. Turker Apr 05 '18 at 17:43