I am trying to execute the below code since I need to lookup the table and create a new column out of it. So, I am trying to go with udf as joining didn't work out.
In that, I am getting the RuntimeError: SparkContext should only be created and accessed on the driver.
error.
To avoid this error I have included the config('spark.executor.allowSparkContext', 'true')
inside the udf function.
But this time I am getting the pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5;
error due to the temp table does not spread across the executors.
How to overcome this error or is there any other better approach.
Below is the code.
df_subsbill_label = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
.load("file:///C://Users//test_data.csv")\
df_service_def = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
.load("file:///C://Users//test_data2.csv")\
df_service_def.createGlobalTempView("ser_definition")
query = '''
SELECT mnthlyfass
FROM ser_definition
WHERE uid = {0}
AND u_soc = '{1}'
AND ser_type = 'SOC'
AND t_type = '{2}'
AND c_type = '{3}'
ORDER BY d_fass DESC, mnthlyfass DESC
LIMIT 1
'''
def lookup_fas(uid, u_soc, t_type, c_type, query):
spark = SparkSession.builder.config('spark.executor.allowSparkContext', 'true').getOrCreate()
query = query.format(uid, u_soc, t_type, c_type,)
df = spark.sql(query)
return df.rdd.flatMap(lambda x : x).collect()
udf_lookup = F.udf(lookup_fas, F.StringType())
df_subsbill_label = df_subsbill_label.withColumn("mnthlyfass", udf_lookup(F.col("uid"), F.col("u_soc"), F.col("t_type"), F.col("c_type"), F.lit(query)))
df_subsbill_label.show(20, False)
Error:
pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5;
'GlobalLimit 1
+- 'LocalLimit 1
+- 'Sort ['d_fass DESC NULLS LAST, 'mnthlyfass DESC NULLS LAST], true