I have to find neighbors of a specific data point in a pyspark dataframe.
a= spark.createDataFrame([("A", [0,1]), ("B", [5,9]), ("D", [13,5])],["Letter", "distances"])
I have created this function that will take in the dataframe (DB) and then check the closest data points to a fixed point (Q) using the euclidean distance. It will filter out the relevant data points based on some epsilon value (eps) and return the subset.
def rangequery(DB, Q, eps):
distance_udf = F.udf(lambda x: float(distance.euclidean(x, Q)), FloatType())
df_neigh =DB.withColumn('euclid_distances', distance_udf(F.col('distances')))
return df_neigh.filter(df_neigh['euclid_distances'] <= eps)
But now I need to run this function for every single point in the data frame
So I do the following.
def check_neighbours(distance):
df = rangequery(a,distances, 9)
if df.count()>=1:
return "Has Neighbours"
else:
return "No Neighbours"
udf_neigh=udf(check_neighbours, StringType())
a.withColumn("label", udf_neigh( a["distances"])).show()
I get the following error when i try to run this code.
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o380.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)