0

I have defined some simple functions like:

def median_func(xs):
    List_median=sorted(xs)
    if len(List_median)%2==0:
        result=(List_median[int(len(List_median)/2) - 1] + List_median[int(len(List_median)/2)])/2
    else:
        result=List_median[int(len(List_median)/2)]
    return result

## --------------------- ##
def max_func(xs):
    List_max=sorted(xs)
    return List_max[-1]

## --------------------- ##
def min_func(xs):
    List_min=sorted(xs)
    return List_min[0]

And define some lambdas as:

import pyspark.sql.functions as sf
median_udf = sf.udf(lambda xs: median_func(xs), DoubleType())
max_udf = sf.udf(lambda xs: max_func(xs), IntegerType())
min_udf = sf.udf(lambda xs: min_func(xs), DoubleType())

In PySpark, I am using these as:

data_frame = data_frame.withColumn("Rolling_median_lat", median_udf(column_latitude))\
                .withColumn("Rolling_median_lon", median_udf(column_longitude))\
                .withColumn("Rolling_max_deltatime", max_udf(column_deltatime))

When I run above with Python 2.7 and PySpark 2.2.0, everything works fine. But when I try the same code with Python 3.6, I see the following issue:

Py4JError: An error occurred while calling None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.execution.python.UserDefinedPythonFunction([class java.lang.String, class org.apache.spark.api.python.PythonFunction, class org.apache.spark.sql.types.DoubleType$, class java.lang.Integer, class java.lang.Boolean]) does not exist
    at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
    at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
    at py4j.Gateway.invoke(Gateway.java:235)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

I am not sure what the issue is. I have attempted some hit and trials (not sure if they're worth mentioning here) but nothing works. What am I doing wrong?

EDIT:

The variables I've defined:

column_latitude

This gives:

Column<b'array(latitude, Lag_latitude_1, Lead_latitude_1, Lag_latitude_2, Lead_latitude_2, Lag_latitude_3, Lead_latitude_3, Lag_latitude_4, Lead_latitude_4)'>

So these are simple String arrays.

EDIT: Here is the original dataframe I have:

data_frame.head(2)

This gives me:

[Row(id=1234, movementdatetime=datetime.datetime(2017, 9, 4, 13, 57, 16), latitude=38.477, longitude=13.256, deltaTime_sec=3459, Lag_latitude_1=38.4593, Lead_latitude_1=38.4872, Lag_longitude_1=13.4902, Lead_longitude_1=13.1767, Lag_deltatime_1=25531, Lead_deltatime_1=1212, Lag_latitude_2=38.3432, Lead_latitude_2=39.5649, Lag_longitude_2=15.1879, Lead_longitude_2=2.6392, Lag_deltatime_2=3280, Lead_deltatime_2=20623078, Lag_latitude_3=38.331, Lead_latitude_3=39.5649, Lag_longitude_3=15.3842, Lead_longitude_3=2.6392, Lag_deltatime_3=3588, Lead_deltatime_3=14580, Lag_latitude_4=38.324, Lead_latitude_4=39.5649, Lag_longitude_4=15.6001, Lead_longitude_4=2.6391, Lag_deltatime_4=0, Lead_deltatime_4=7199),
Row(id=2345, movementdatetime=datetime.datetime(2017, 9, 4, 14, 17, 28), latitude=38.4872, longitude=13.1767, deltaTime_sec=1212, Lag_latitude_1=38.477, Lead_latitude_1=39.5649, Lag_longitude_1=13.256, Lead_longitude_1=2.6392, Lag_deltatime_1=3459, Lead_deltatime_1=20623078, Lag_latitude_2=38.4593, Lead_latitude_2=39.5649, Lag_longitude_2=13.4902, Lead_longitude_2=2.6392, Lag_deltatime_2=25531, Lead_deltatime_2=14580, Lag_latitude_3=38.3432, Lead_latitude_3=39.5649, Lag_longitude_3=15.1879, Lead_longitude_3=2.6391, Lag_deltatime_3=3280, Lead_deltatime_3=7199, Lag_latitude_4=38.331, Lead_latitude_4=39.5649, Lag_longitude_4=15.3842, Lead_longitude_4=2.6391, Lag_deltatime_4=3588, Lead_deltatime_4=10803)]

Basically I have multiple columns (nine) of which I need to calculate the median.

Shubham A.
  • 2,446
  • 4
  • 36
  • 68
  • update the pyspark udfs like this median_udf = sf.udf(median_func, DoubleType()), other code looks fine – Ranga Vure Apr 11 '19 at 14:18
  • @RangaVure I face same error after updating pyspark udfs. – Shubham A. Apr 12 '19 at 07:39
  • 1
    Your code is working fine for me, i have pyspark 2.3 and python 3.7. It seems the issue might be with your env as you might be switching between 2.7/3.x and pyspark versions. – Ranga Vure Apr 12 '19 at 09:51
  • @ShubhamA. you're going to have to provide a small [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) with the desired output. – pault Apr 12 '19 at 13:50
  • @RangaVure I am using a new environment with no trace of Python 2.7 (as far as I know). – Shubham A. Apr 15 '19 at 08:07

0 Answers0