0

I am trying to execute the below code since I need to lookup the table and create a new column out of it. So, I am trying to go with udf as joining didn't work out.

In that, I am getting the RuntimeError: SparkContext should only be created and accessed on the driver. error.

To avoid this error I have included the config('spark.executor.allowSparkContext', 'true') inside the udf function.

But this time I am getting the pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5; error due to the temp table does not spread across the executors.

How to overcome this error or is there any other better approach.

Below is the code.

df_subsbill_label = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data.csv")\                    
    
 df_service_def = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data2.csv")\  
    
    df_service_def.createGlobalTempView("ser_definition")
    
    
    query = '''
    SELECT mnthlyfass
    FROM ser_definition
    WHERE uid = {0}
    AND u_soc = '{1}'
    AND ser_type = 'SOC'
    AND t_type = '{2}'
    AND c_type = '{3}'
    ORDER BY d_fass DESC, mnthlyfass DESC
    LIMIT 1
    '''
    
    
    def lookup_fas(uid, u_soc, t_type, c_type, query):
        spark = SparkSession.builder.config('spark.executor.allowSparkContext', 'true').getOrCreate()
        query = query.format(uid, u_soc, t_type, c_type,)
        df = spark.sql(query)
        return df.rdd.flatMap(lambda x : x).collect()
    
    udf_lookup = F.udf(lookup_fas, F.StringType())
    
    df_subsbill_label = df_subsbill_label.withColumn("mnthlyfass", udf_lookup(F.col("uid"), F.col("u_soc"), F.col("t_type"), F.col("c_type"), F.lit(query)))
    df_subsbill_label.show(20, False)

Error:

    pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5;
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Sort ['d_fass DESC NULLS LAST, 'mnthlyfass DESC NULLS LAST], true
Dinesh Kumar
  • 85
  • 2
  • 7
  • From the error it looks like the Table you are using in the query is not found. However reading your question, I think you could use a Broadcast variable that has the lookup information. – Amit Jan 31 '23 at 14:37
  • @Amit, I have the table created using this . df_service_def.createGlobalTempView("ser_definition"), but still the table does not spread across the executors its throwing table not found error. How do we broadcast these kind of tables? – Dinesh Kumar Jan 31 '23 at 14:59
  • Check this for broadcast information. You create and initialize broadcast variable on driver and then broadcast it to executors. https://spark.apache.org/docs/latest/api/scala/org/apache/spark/SparkContext.html#broadcast[T](value:T)(implicitevidence$9:scala.reflect.ClassTag[T]):org.apache.spark.broadcast.Broadcast[T] – Amit Jan 31 '23 at 16:08

2 Answers2

0

Please add "global_temp", the database name followed by the table name in the SQL.

FROM global_temp.ser_definition

This should work.

  • still no luck. Getting the same error. pyspark.sql.utils.AnalysisException: Table or view not found: global_temp.ser_definition; line 3 pos 5; 'GlobalLimit 1 +- 'LocalLimit 1 – Dinesh Kumar Jan 31 '23 at 15:28
  • Please check out the below link, this is what @Amit had explained. https://stackoverflow.com/questions/55084996/why-dataframe-cannot-be-accessed-inside-udf-apache-spark-scala#:~:text=actually%20it%27s%20possible%20to%20use%20dataframes%20in%20UDF%2C,Mar%2010%2C%202019%20at%2021%3A00%20Add%20a%20comment – Meena Arumugam Jan 31 '23 at 18:06
0

First you shoud not get spark session on to executor if you are running spark in cluster mode as spark session object cannot be serialised thus cannot send it to executor. Also, it is against spark design principles to do so.

What you can do here is to broadcast your dataframe instead, this will create a copy of your dataframe inside each executor, then you can get the dataframe in the executor:

df_service_def = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data2.csv")
broadcastVar = spark.broadcast(Array(0, 1, 2, 3))
broadcasted_df_service_def = spark.sparkContext.broadcast(df_service_def)

then inside your udf:

def lookup_fas(uid, u_soc, t_type, c_type, query):
    df = broadcasted_df_service_def.value
    # here apply your query on the dataframe ...

PS: Even though this should work I think it my impact the performance since an udf is called for each row, so maybe you should change the design of your solution.

Abdennacer Lachiheb
  • 4,388
  • 7
  • 30
  • 61
  • I getting below error while trying to broadcast. raise RuntimeError( RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. During handling of the above exception, another exception occurred: raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize broadcast: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, .. etc – Dinesh Kumar Feb 02 '23 at 13:34