0

new is a rdd like below:

[('hotel stinsen', '59.066', '15.1115'),
 ('ankaret hotel and restaurant', '58.6725', '17.0975'),
 ('quality hotel ekoxen', '58.40574', '15.62391'),
 ('hotel nordica', '63.8532', '15.5652'),
 ('hotel gastis', '57.1101', '12.2669'),
 ('berling hotel', '59.3793', '13.4994'),]

Then this is the function I want to register as a UDF ( is_out_10km ):

# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
# also return xx km not meter
from math import radians, cos, sin, asin, sqrt
def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance in kilometers between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
    return c * r

def is_out_10km(lon1, lat1, lon2, lat2):
    distance=haversine(lon1, lat1, lon2, lat2)
    if distance>10:return False
    return True
    
# haversine(42.55129,1.64243,42.53619,1.61625) # just for test

Below is the registration process

from pyspark.sql.functions import udf
udf_is_out_10km = udf(lambda lon1, lat1, lon2, lat2:is_out_10km(lon1, lat1, lon2, lat2))

When I was trying to use that UDF, then something strange happened.

lat=59.334591;long=18.063240
new.filter(lambda x:udf_is_out_10km(lat,long,float(x[1]),float(x[2]))).count()

The main error information is "RuntimeError: SparkContext should only be created and accessed on the driver."

The complete error information is :

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [21], in <module>
----> 1 new.filter(lambda x:udf_is_out_10km(lat,long,float(x[1]),float(x[2]))).count()

File /usr/local/spark/python/pyspark/rdd.py:1237, in RDD.count(self)
   1228 def count(self):
   1229     """
   1230     Return the number of elements in this RDD.
   1231 
   (...)
   1235     3
   1236     """
-> 1237     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

File /usr/local/spark/python/pyspark/rdd.py:1226, in RDD.sum(self)
   1217 def sum(self):
   1218     """
   1219     Add up the elements in this RDD.
   1220 
   (...)
   1224     6.0
   1225     """
-> 1226     return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)

File /usr/local/spark/python/pyspark/rdd.py:1080, in RDD.fold(self, zeroValue, op)
   1076     yield acc
   1077 # collecting result of mapPartitions here ensures that the copy of
   1078 # zeroValue provided to each partition is unique from the one provided
   1079 # to the final reduce call
-> 1080 vals = self.mapPartitions(func).collect()
   1081 return reduce(op, vals, zeroValue)

File /usr/local/spark/python/pyspark/rdd.py:950, in RDD.collect(self)
    941 """
    942 Return a list that contains all of the elements in this RDD.
    943 
   (...)
    947 to be small, as all the data is loaded into the driver's memory.
    948 """
    949 with SCCallSiteSync(self.context) as css:
--> 950     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    951 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File /usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py:1309, in JavaMember.__call__(self, *args)
   1303 command = proto.CALL_COMMAND_NAME +\
   1304     self.command_header +\
   1305     args_command +\
   1306     proto.END_COMMAND_PART
   1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
   1310     answer, self.gateway_client, self.target_id, self.name)
   1312 for temp_arg in temp_args:
   1313     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /usr/local/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 4) (73b33c0aa869 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 417, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7267/456204876.py", line 1, in <lambda>
  File "/usr/local/spark/python/pyspark/sql/udf.py", line 199, in wrapper
    return self(*args)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 177, in __call__
    judf = self._judf
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 167, in _create_judf
    spark = SparkSession.builder.getOrCreate()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 392, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1271, in _assert_on_driver
    raise RuntimeError("SparkContext should only be created and accessed on the driver.")
RuntimeError: SparkContext should only be created and accessed on the driver.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 417, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1237, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_7267/456204876.py", line 1, in <lambda>
  File "/usr/local/spark/python/pyspark/sql/udf.py", line 199, in wrapper
    return self(*args)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 177, in __call__
    judf = self._judf
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 167, in _create_judf
    spark = SparkSession.builder.getOrCreate()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 392, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1271, in _assert_on_driver
    raise RuntimeError("SparkContext should only be created and accessed on the driver.")
RuntimeError: SparkContext should only be created and accessed on the driver.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
HorusLiang
  • 33
  • 5
  • There maybe simpler approaches. Give us your input and expected output. – wwnde Oct 09 '22 at 06:16
  • given one pair of address ( lat=59.334591;long=18.063240 ), and a rdd containing so many (lat long and name), calculate the address in rdd which is 10km close to the given address and extract the name. – HorusLiang Oct 09 '22 at 10:02

1 Answers1

0

Registering and applying a function as a UDF is part of the PySpark DataFrame API. When working with the RDD API, you should call the Python function directly in map/filter/reduce/etc. The error surfaced about the SparkContext is probably related to this mismatch.

In the example provided, after omitting the UDF lines, can be simplified to:

    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    sc = spark.sparkContext
    data = [('hotel stinsen', '59.066', '15.1115'),
            ('ankaret hotel and restaurant', '58.6725', '17.0975'),
            ('quality hotel ekoxen', '58.40574', '15.62391'),
            ('hotel nordica', '63.8532', '15.5652'),
            ('hotel gastis', '57.1101', '12.2669'),
            ('berling hotel', '59.3793', '13.4994'), ]
    rdd = sc.parallelize(data)
    lat = 59.334591
    long = 18.063240
    rdd.filter(lambda x: is_out_10km(lat, long, float(x[1]), float(x[2]))).count()

which returns 0 for the data provided.

danielcahall
  • 2,672
  • 8
  • 14