1

I am unable to invoke UDF with windowing function.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
mst=spark.createDataFrame([(1,"v1" ), (2,"v1"), (3,"v1" ),(21,"v2" ), (22,"v2"), (31,"v3" )], ["mst_id","mst_val"])
ref=spark.createDataFrame([(91,"v1" ), (92,"v2"), (93,"v3"  )], ["ref_id","ref_val"])

Defined a simple function


def fnc1 (val):
    w=Window().partitionBy("mst_val").orderBy(F.col("mst_id").asc())
    mtch=mst.withColumn("rank",F.row_number().over(w)).filter((F.col("rank") == 1) & (F.col("mst_val") == F.lit(val))).rdd.collect()
    return (mtch[0]['mst_id'] if len(mtch) else -1)

fnc1("v3") 

yields 31

Defined a simple UDF

from pyspark.sql.functions import udf, col
from pyspark.sql.types import  *
udf1 = udf(lambda r: fnc1(r),IntegerType())

Invoking the udf is erroring out.

ref.withColumn("abc",udf1(col("ref_val")))

Gives error: py4j.Py4JException: Method __getnewargs__([]) does not exist

Can anyone help me. Thanks!

Laurens Koppenol
  • 2,946
  • 2
  • 20
  • 33
sms
  • 11
  • 1
  • Have a look at this post maybe: https://stackoverflow.com/questions/55688664/calling-another-custom-python-function-from-pyspark-udf – Liky May 09 '20 at 23:28

0 Answers0