I have long-running tasks (udf) I need to run on PySpark, some of them can run for hours but I'd like to add some kind of timeout wrapper in case they really run for too long. I'd just like to return a None
if there's a timeout.
I've done something with signal
but I'm pretty sure that's not the safest way of doing it.
import pyspark
import signal
import time
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)
schema = StructType([
StructField("sleep", IntegerType(), True),
StructField("value", StringType(), True),
])
data = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [1, "e"], [2, "f"]]
df = spark.createDataFrame(data, schema=schema)
def handler(signum, frame):
raise TimeoutError()
def squared_typed(s):
def run_timeout():
signal.signal(signal.SIGALRM, handler)
signal.alarm(3)
time.sleep(s)
return s * s
try:
return run_timeout()
except TimeoutError as e:
return None
squared_udf = udf(squared_typed, IntegerType())
df.withColumn('sq', squared_udf('sleep')).show()
It works and gives me the expected output, but is there a way of doing it in a more pysparkly way ?
+-----+-----+----+
|sleep|value| sq|
+-----+-----+----+
| 1| a| 1|
| 2| b| 4|
| 3| c|null|
| 4| d|null|
| 1| e| 1|
| 2| f| 4|
+-----+-----+----+
Thanks