0

I am having trouble with creating a Pandas UDF that performs a calculation on a pd Series based on a value in the same row of the underlying Spark Dataframe.

However, the most straight forward solution doesn't seem to be supported by the Pandas on Spark API:

A very simple example like below

from pyspark.sql.types import IntegerType

import pyspark.sql.functions as F
import pandas as pd

@F.pandas_udf(IntegerType())
def addition(arr: pd.Series, addition: int) -> pd.Series:
  return arr.add(addition)

df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
df.show()

df.withColumn("added", addition(F.col("array"),F.col("addition")))

throws the following exception on the udf definition line

NotImplementedError: Unsupported signature: (arr: pandas.core.series.Series, addition: int) -> pandas.core.series.Series.

Am i tackling this problem in a wrong way? I could reimplement the whole "addition" function in native PySpark, but the real function I am talking about is terribly complex and would mean an enormous amount of rework.

  • you indicate you are not planning to use the `add` pandas function, which pandas function are planning to use? – smurphy Jan 13 '23 at 18:44
  • 1
    The dataframe i am trying to build is a transformation from an array of transmittances + a lot of metadata, into a number of Arrays (Wavenumbers, transmittances, absorbances, baseline fitted absorbances), a validity check and a list of anomalies in the spectrum. The processor for performing these calculations uses tens of pandas functions. – Wim Schmitz Jan 17 '23 at 09:24

1 Answers1

1

Loading the example, adding import array

from pyspark.sql.types as T
import pyspark.sql.functions as F
import pandas as pd
from array import array

df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
df.show(truncate=False)
print(df.schema.fields)

The response is,

+---------+--------+
|    array|addition|
+---------+--------+
|[1, 2, 3]|      10|
|[4, 5, 6]|      20|
+---------+--------+

[StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True)]

If you must use a Pandas function to complete your task here is an option for a solution that uses a Pandas function within a PySpark UDF,

  • The Spark DF arr column is ArrayType, convert it into a Pandas Series
  • Apply the Pandas function
  • Then, convert the Pandas Series back to an array
@F.udf(T.ArrayType(T.LongType()))
def addition_pd(arr, addition):
    pd_arr = pd.Series(arr)
    added = pd_arr.add(addition)
    return array("l", added)

df = df.withColumn("added", addition_pd(F.col("array"),F.col("addition")))
df.show(truncate=False)
print(df.schema.fields)

Returns

+---------+--------+------------+
|array    |addition|added       |
+---------+--------+------------+
|[1, 2, 3]|10      |[11, 12, 13]|
|[4, 5, 6]|20      |[24, 25, 26]|
+---------+--------+------------+

[StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True), StructField('added', ArrayType(LongType(), True), True)]

However, it is worth stating that when possible it is recommended to use PySpark Functions over the use of PySpark UDF (see here)

smurphy
  • 148
  • 7
  • 1
    Thanks, this indeed fixed the problem. Yes, I could in theory rewrite everything in PySpark, but I would loose compatibility with the other tools already built based on the Python package. If you would have an answer on my [underlying problem](https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows), please let me know! :) – Wim Schmitz Jan 17 '23 at 09:11