0

I am trying to perform on union all by call the statements which are predefined as below.

`new_records="""select 
sor.EMP_ID,
sor.EMP_NAME,
sor.EMP_STATE,
sor.EMP_PH,
'I' as CDC_IND,
TO_DATE(from_unixtime(unix_timestamp())) as EFCT_DT,
cast('9999-12-31'  as date) as  EXPR_DT
from scd.EMP_SOR sor 
left join scd.EMP_HIST_ACTIVE  
active_hist on   
where active_hist.EMP_ID is NULL"""`

`unchanged_records="""select 
sor.EMP_ID,
sor.EMP_NAME,
sor.EMP_STATE,
sor.EMP_PH,
'N' as CDC_IND,
emp_hist.expr_dt,
emp_hist.efct_dt 
from scd.EMP_SOR sor  
inner join scd.EMP_HIST_ACTIVE emp_hist
on sor.EMP_ID = emp_hist.EMP_ID
where sor.EMP_ID = emp_hist.EMP_ID
and sor.EMP_NAME = emp_hist.EMP_NAME
and sor.EMP_STATE = emp_hist.EMP_NAME
and sor.EMP_PH = emp_hist.EMP_PH"""`

`changed_records="""select
sor.EMP_ID,
sor.EMP_NAME,
sor.EMP_STATE,
sor.EMP_PH,
'U' as CDC_IND,
TO_DATE(from_unixtime(unix_timestamp())) as EFCT_DT,
cast('9999-12-31'  as date) as EXPR_DT 
from scd.EMP_SOR sor inner join scd.EMP_HIST_ACTIVE emp_shit
on sor.EMP_ID = emp_hist.EMP_ID
where sor.EMP_ID <> emp_hist.EMP_ID
or sor.EMP_NAME <> emp_hist.EMP_NAME
or sor.EMP_STATE <> emp_hist.EMP_NAME
or sor.EMP_PH <> emp_hist.EMP_PH"""`

`sqlContext.sql("new_records union all unchanged_records
 union all   changed_records")`

I am calling the above sql's in union all

which should return the result by performing union all but some reason the query fails with the below error in spark

ERROR: cannot recognize input near 'new_records' 'union' 'all'; line 1 pos 0

i am not sure what i am i missing here can some one help me

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121

1 Answers1

1

simple syntax errors.


import org.apache.spark.sql.SparkSession

val  new_records="""select
    sor.EMP_ID,
    sor.EMP_NAME,
    sor.EMP_STATE,
    sor.EMP_PH,
    'I' as CDC_IND,
    TO_DATE(from_unixtime(unix_timestamp())) as EFCT_DT,
    cast('9999-12-31'  as date) as  EXPR_DT
    from scd.EMP_SOR sor
    left join scd.EMP_HIST_ACTIVE
    active_hist on
    where active_hist.EMP_ID is NULL"""

val unchanged_records="""select
    sor.EMP_ID,
    sor.EMP_NAME,
    sor.EMP_STATE,
    sor.EMP_PH,
    'N' as CDC_IND,
    emp_hist.expr_dt,
    emp_hist.efct_dt
    from scd.EMP_SOR sor
    inner join scd.EMP_HIST_ACTIVE emp_hist
    on sor.EMP_ID = emp_hist.EMP_ID
    where sor.EMP_ID = emp_hist.EMP_ID
    and sor.EMP_NAME = emp_hist.EMP_NAME
    and sor.EMP_STATE = emp_hist.EMP_NAME
    and sor.EMP_PH = emp_hist.EMP_PH"""


val changed_records="""select
    sor.EMP_ID,
    sor.EMP_NAME,
    sor.EMP_STATE,
    sor.EMP_PH,
    'U' as CDC_IND,
    TO_DATE(from_unixtime(unix_timestamp())) as EFCT_DT,
    cast('9999-12-31'  as date) as EXPR_DT
    from scd.EMP_SOR sor inner join scd.EMP_HIST_ACTIVE emp_shit
    on sor.EMP_ID = emp_hist.EMP_ID
    where sor.EMP_ID <> emp_hist.EMP_ID
    or sor.EMP_NAME <> emp_hist.EMP_NAME
    or sor.EMP_STATE <> emp_hist.EMP_NAME
    or sor.EMP_PH <> emp_hist.EMP_PH"""

    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local") //.config("spark.eventLog.enabled", "true")
      .appName("uniontest")
      .getOrCreate()
    spark.sql(s"$new_records " +
      s" union all " +
      s"$unchanged_records " +
      s"  union all   $changed_records")

will work Also have a look at Why would I want .union over .unionAll in Spark for SchemaRDDs?

Jason Heo
  • 9,956
  • 2
  • 36
  • 64
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • I am using python as the programming language to express union all but its not working – bhaskar reddy Aug 11 '19 at 04:19
  • @Ram Ghadiyaram..could you please also have a look at below question.. I have answered it but not sure if it works with a large file or not.https://stackoverflow.com/questions/57411831/how-to-efficiently-upload-a-large-tsv-file-to-a-hive-table-with-split-columns-i# – vikrant rana Aug 11 '19 at 04:38
  • Use `current_date` instead of `TO_DATE(from_unixtime(unix_timestamp()))`, see https://stackoverflow.com/a/41140298/2700344 – leftjoin Aug 11 '19 at 05:48