new is a rdd like below:
[('hotel stinsen', '59.066', '15.1115'),
('ankaret hotel and restaurant', '58.6725', '17.0975'),
('quality hotel ekoxen', '58.40574', '15.62391'),
('hotel nordica', '63.8532', '15.5652'),
('hotel gastis', '57.1101', '12.2669'),
('berling hotel', '59.3793', '13.4994'),]
Then this is the function I want to register as a UDF ( is_out_10km ):
# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
# also return xx km not meter
from math import radians, cos, sin, asin, sqrt
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance in kilometers between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
return c * r
def is_out_10km(lon1, lat1, lon2, lat2):
distance=haversine(lon1, lat1, lon2, lat2)
if distance>10:return False
return True
# haversine(42.55129,1.64243,42.53619,1.61625) # just for test
Below is the registration process
from pyspark.sql.functions import udf
udf_is_out_10km = udf(lambda lon1, lat1, lon2, lat2:is_out_10km(lon1, lat1, lon2, lat2))
When I was trying to use that UDF, then something strange happened.
lat=59.334591;long=18.063240
new.filter(lambda x:udf_is_out_10km(lat,long,float(x[1]),float(x[2]))).count()
The main error information is "RuntimeError: SparkContext should only be created and accessed on the driver."
The complete error information is :
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Input In [21], in <module>
----> 1 new.filter(lambda x:udf_is_out_10km(lat,long,float(x[1]),float(x[2]))).count()
File /usr/local/spark/python/pyspark/rdd.py:1237, in RDD.count(self)
1228 def count(self):
1229 """
1230 Return the number of elements in this RDD.
1231
(...)
1235 3
1236 """
-> 1237 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File /usr/local/spark/python/pyspark/rdd.py:1226, in RDD.sum(self)
1217 def sum(self):
1218 """
1219 Add up the elements in this RDD.
1220
(...)
1224 6.0
1225 """
-> 1226 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File /usr/local/spark/python/pyspark/rdd.py:1080, in RDD.fold(self, zeroValue, op)
1076 yield acc
1077 # collecting result of mapPartitions here ensures that the copy of
1078 # zeroValue provided to each partition is unique from the one provided
1079 # to the final reduce call
-> 1080 vals = self.mapPartitions(func).collect()
1081 return reduce(op, vals, zeroValue)
File /usr/local/spark/python/pyspark/rdd.py:950, in RDD.collect(self)
941 """
942 Return a list that contains all of the elements in this RDD.
943
(...)
947 to be small, as all the data is loaded into the driver's memory.
948 """
949 with SCCallSiteSync(self.context) as css:
--> 950 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
951 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
File /usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py:1309, in JavaMember.__call__(self, *args)
1303 command = proto.CALL_COMMAND_NAME +\
1304 self.command_header +\
1305 args_command +\
1306 proto.END_COMMAND_PART
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1312 for temp_arg in temp_args:
1313 temp_arg._detach()
File /usr/local/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
File /usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 4) (73b33c0aa869 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
out_iter = func(split_index, iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 417, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/tmp/ipykernel_7267/456204876.py", line 1, in <lambda>
File "/usr/local/spark/python/pyspark/sql/udf.py", line 199, in wrapper
return self(*args)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 177, in __call__
judf = self._judf
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 161, in _judf
self._judf_placeholder = self._create_judf()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 167, in _create_judf
spark = SparkSession.builder.getOrCreate()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 392, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
SparkContext._assert_on_driver()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1271, in _assert_on_driver
raise RuntimeError("SparkContext should only be created and accessed on the driver.")
RuntimeError: SparkContext should only be created and accessed on the driver.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
out_iter = func(split_index, iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 417, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/tmp/ipykernel_7267/456204876.py", line 1, in <lambda>
File "/usr/local/spark/python/pyspark/sql/udf.py", line 199, in wrapper
return self(*args)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 177, in __call__
judf = self._judf
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 161, in _judf
self._judf_placeholder = self._create_judf()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 167, in _create_judf
spark = SparkSession.builder.getOrCreate()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 392, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
SparkContext._assert_on_driver()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1271, in _assert_on_driver
raise RuntimeError("SparkContext should only be created and accessed on the driver.")
RuntimeError: SparkContext should only be created and accessed on the driver.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more