2

I have a working lambda function in Python that computes the highest similarity between each string in dataset1 and the strings in dataset2. During an iteration, it writes the string, the best match and the similarity together with some other information to bigquery. There is no return value, as the purpose of the function is to insert a row into a bigquery dataset. This process takes rather long which is why I wanted to use Pyspark and Dataproc to speed up the process.

Converting the pandas dataframes to spark was easy. I am having trouble to register my udf, because it has no return value and pyspark requires one. In addition I don't understand how to map the 'apply' function in python to the pyspark variant. So basically my question is how to transform the python code below to work on a spark dataframe.

The following code works in a regular Python environment:

def embargomatch(name, code, embargo_names):
     find best match 
     insert best match and additional information to bigquery

customer_names.apply(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names),axis=1)

Because pyspark requires a return type, I added 'return 1' to the udf and tried the following:


customer_names = spark.createDataFrame(customer_names)

from pyspark.sql.types import IntegerType
embargo_match_udf = udf(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names), IntegerType())

Now i'm stuck trying to apply the select function, as I don't know what parameters to give.

Charles Van Damme
  • 765
  • 2
  • 6
  • 14

1 Answers1

2

I suspect you're stuck on how to pass multiple columns to the udf -- here's a good answer to that question: Pyspark: Pass multiple columns in UDF.

Rather than creating a udf based on a lambda that wraps your function, consider simplifying by creating a udf based on embargomatch directly.

embargo_names = ...

# The parameters here are the columns passed into the udf
def embargomatch(name, customer_code):
    pass
embargo_match_udf = udf(embargomatch, IntegerType())
customer_names.select(embargo_match_udf(array('name', 'customer_code')).alias('column_name'))

That being said, it's suspect that your udf doesn't return anything -- I generally see udfs as a way to add columns to the dataframe, but not to have side effects. If you want to insert records into bigquery, consider doing something like this:

customer_names.select('column_name').write.parquet('gs://some/path')
os.system("bq load --source_format=PARQUET [DATASET].[TABLE] gs://some/path")
Karthik Palaniappan
  • 1,373
  • 8
  • 11
  • I want to add columns to my dataframe. The problem is that I want to add 3 columns (1 integer and 2 strings). I can't do it in separate lambda functions because the function iterates over a few thousand records to find a best match. I need both the name of the best match AND the similarity score to be added. Is that possible? – Charles Van Damme Jul 22 '19 at 08:58
  • 1
    Yup, you can: https://stackoverflow.com/questions/48979440/how-to-use-udf-to-return-multiple-columns. tl;dr return a complex type, then use another select statement to break it up into separate columns. – Karthik Palaniappan Jul 22 '19 at 17:24
  • Thank you! Using pyspark's MapType solved it for me! – Charles Van Damme Jul 23 '19 at 07:32