2

I would like to run udf on Pandas on Spark dataframe. I thought it should be easy but having tough time figuring it out.

For example, consider my psdf (Pandas Spark DataFrame)

     name     p1     p2
0     AAA    1.0    1.0
1     BBB    1.0    1.0

I have a simple function,

def f(a:float, b:float) -> float:
   return math.sqrt(a**2 + b**2)

I expect below psdf,

     name     p1     p2  R
0     AAA    1.0    1.0  1.4
1     BBB    1.0    1.0  1.4

The function is quite dynamic and I showed only a sample here. For example, there is another function with 3 arguments.

I tried below code but getting an error on not set compute.ops_on_diff_frames parameter and document says it is expensive. Hence, want to avoid it.

psdf["R"] = psdf[["p1","p2"]].apply(lambda x: f(*x), axis=1)

Note: I saw one can convert to normal spark dataframe and use withColumn but not sure if it will have performance penality

Any suggestions?

Selva
  • 951
  • 7
  • 23

2 Answers2

0

You can convert to a spark df and apply a pandas_udf. Converting from/to koalas has minor overhead compared to applying a python udf. Also you should look at using a pandas_udf which is more efficient than row based udfs.

Paul Bendevis
  • 2,381
  • 2
  • 31
  • 42
0

As a developer, I am learning go for the simplest available way to solve your problem. Otherwise you create yourself problems now and in the future. pandas udfs are useful in getting to use pandas functionality missing in pyspark. In this case all you need is available in pyspark. Rememeber apply lambda is an anti climax in pandas. I suggest you use higher order functions. Logic and code below

new = (df.withColumn('x', array(*[x for x in df.columns if x!='id']))#Create an array of columns except id
       .withColumn('y',expr("sqrt(reduce(x,cast(0 as double), (c,i)-> (c*c+i*i)))"))#Use high order functions to square and find squareroot
      ).show()
wwnde
  • 26,119
  • 6
  • 18
  • 32