0

I have written the following pandas_udf to calculate the haversine distance in PySpark:

def haversine(witness_lat : pd.Series, witness_lon: pd.Series, beacon_lat: pd.Series, beacon_lon: pd.Series) -> pd.Series:
    if None in [witness_lat, witness_lon, beacon_lat, beacon_lon]:
        return None
    else:
        lon1 = witness_lon
        lat1 = witness_lat
        lon2 = beacon_lon
        lat2 = beacon_lat

        lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
        dlon = lon2 - lon1 
        dlat = lat2 - lat1 
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a)) 
        m = 6367000 * c
        return m

@pandas_udf("float", PandasUDFType.SCALAR)
def udf_calc_distance(st_y_witness, st_x_witness, st_y_transmitter, st_x_transmitter):
    distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
    distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
    return distance_df['distance']

This code runs properly and gives me the answer I would expect however I get a depreciation warning shown below.

UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
  warnings.warn(

I've looked at the latest pandas_udf documentation on databricks here: https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html but I'm not sure how to use the hints with the apply formatting. I set up my code based on other examples I've seen on stack overflow like this one: Passing multiple columns in Pandas UDF PySpark which follow the format that will be depreciated.

Thank you for the help!

zorrrba
  • 65
  • 1
  • 7

2 Answers2

1

Just add the function typing as you did for your haversine function:

@pandas_udf("float")
def udf_calc_distance(st_y_witness: pd.Series, st_x_witness: pd.Series, st_y_transmitter: pd.Series, st_x_transmitter: pd.Series) -> pd.Series:
    distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
    distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
    return distance_df['distance']
mck
  • 40,932
  • 13
  • 35
  • 50
0

If you're looking for description/documentation see "Examples" section in pyspark pandas_udf docs.

From Spark 3.0 with Python 3.6+, Python type hints detect the function types as below:

>>> @pandas_udf(IntegerType())
>>> def slen(s: pd.Series) -> pd.Series:
>>>     return s.str.len()

Prior to Spark 3.0, the pandas UDF used functionType to decide the execution type as below:

>>> from pyspark.sql.functions import PandasUDFType
>>> from pyspark.sql.types import IntegerType
>>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR)
>>> def slen(s):
>>>     return s.str.len()

It is preferred to specify type hints for the pandas UDF instead of specifying pandas UDF type via > functionType which will be deprecated in the future releases.

Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of pyspark.sql.types.StructType.

Kashyap
  • 15,354
  • 13
  • 64
  • 103