I am trying to run sql query
get_ns_inv_data = """
select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,
sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,
value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,
reg.territory as ns_territory, reg.region as ns_region,
bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name
from netsuite.invoices inv
left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id
left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id
left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id
left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;"""
IN the airflow dag as follows:
netsuite_inv_to_s3 = RedshiftToS3Operator(
task_id="ns_inv_to_s3",
s3_bucket = S3_BUCKET_NAME,
s3_key = S3_NS_KEY,
redshift_conn_id="rs_ops_prod",
aws_conn_id="S3_Dev_Connection",
select_query = get_ns_inv_data,
unload_options= ['allowoverwrite']
)
However when I run the dag, I get the following error:
[2022-07-22, 15:55:38 UTC] {dbapi.py:213} INFO - Running statement:
UNLOAD ('
select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,
sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,
value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,
reg.territory as ns_territory, reg.region as ns_region,
bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name
from netsuite.invoices inv
left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id
left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id
left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id
left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;')
TO 's3://data-warehousing-data-engineering/arr_poc/netsuite/transaction_file_raw.csv'
credentials
'aws_access_key_id=AKIA56JTTF6DBUI6N5NG;aws_secret_access_key=***'
allowoverwrite;
, parameters: None
[2022-07-22, 15:55:38 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1648, in execute
ps = cache["ps"][key]
KeyError: ("\n UNLOAD ('\n select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,\n sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,\n value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,\n reg.territory as ns_territory, reg.region as ns_region,\n bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name\n from netsuite.invoices inv\n left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id\n left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id\n left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id\n left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;')\n TO 's3://data-warehousing-data-engineering/arr_poc/netsuite/transaction_file_raw.csv'\n credentials\n 'aws_access_key_id=AKIA56JTTF6DBUI6N5NG;aws_secret_access_key=***'\n allowoverwrite;\n ", ())
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/providers/amazon/aws/transfers/redshift_to_s3.py", line 156, in execute
redshift_hook.run(unload_query, self.autocommit, parameters=self.parameters)
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 193, in run
self._run_command(cur, sql_statement, parameters)
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 217, in _run_command
cur.execute(sql_statement)
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/cursor.py", line 231, in execute
self._c.execute(self, operation, args)
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1718, in execute
self.handle_messages(cursor)
File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1986, in handle_messages
raise self.error
redshift_connector.error.ProgrammingError: {'S': 'ERROR', 'C': '42601', 'M': 'syntax error at or near "z"', 'P': '649', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parser_scan.l', 'L': '714', 'R': 'yyerror'}
[2022-07-22, 15:55:38 UTC] {taskinstance.py:1395} INFO - Marking task as FAILED. dag_id=arr_poc, task_id=prod_rs_data_pull.ns_inv_to_s3, execution_date=20220722T195529, start_date=20220722T195536, end_date=20220722T195538
[2022-07-22, 15:55:38 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 753 for task prod_rs_data_pull.ns_inv_to_s3 ({'S': 'ERROR', 'C': '42601', 'M': 'syntax error at or near "z"', 'P': '649', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parser_scan.l', 'L': '714', 'R': 'yyerror'}; 89443)
[2022-07-22, 15:55:38 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-22, 15:55:38 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
It seems that airflow does not like '' in the replace statement. Has anyone run into this, if so how was this solved?
I am trying to replace DoubleQuotes with blank so that when the data is written to S3, there are no special characters in it.
Thanks