1

I am trying to find a way to execute stored sql expression in a column using pyspark. Here is the sample below.

col1 col2 rules
rule1 test when col1 is xxx then 'hello' when col2 is 'yyy' then 'world'
rule2 test1 when col2 is 'abc' then 'foo' when col1 is null then 'bar'

after executing the rules column, I want the output to like below.

col1 col2 rules result
rule1 test when col1 is 'rule1' then 'hello' when col2 is 'yyy' then 'world' hello
rule2 test1 when col2 is 'abc' then 'foo' when col1 is null then 'bar' null

I tried selectExpr but it's not working.. any help will be appreciated

prasanta
  • 25
  • 1
  • 6

1 Answers1

0

You can't. Not easily at least.

You'll run in to RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. https://issues.apache.org/jira/browse/SPARK-5063

TL;DR is that code running on executor can not use sc, only the code running on driver is allowed to do so. It's like running spark.read(...) inside a UDF, it's not supported.

See this also.


What you can do is:

@udf
def process_sql_expression(rule, c1, c2):
   # logic to parse rule and return c1 or c2
   return ...

df_orig = spark.read(...) # has col1, col2 and rule
new_df = df_orig.select(
              'col1', 
              'col2', 
              'rules', 
              process_sql_expression('rules', 'col1', 'col2').alias('result')
)

Gotcha is that "# logic to parse rule and return c1 or c2" can not use spark or sc. So if you e.g. can find a Java/python lib that can do it for you then you can make it available to all executors by installing it everywhere and then use it in the udf implementation.

Kashyap
  • 15,354
  • 13
  • 64
  • 103