1

I am trying to run sql query

get_ns_inv_data = """
    select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,
    sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,
    value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,
    reg.territory as ns_territory, reg.region as ns_region,
    bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name
    from netsuite.invoices inv
    left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id
    left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id
    left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id
    left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;"""

IN the airflow dag as follows:

netsuite_inv_to_s3 = RedshiftToS3Operator(
        task_id="ns_inv_to_s3",
        s3_bucket = S3_BUCKET_NAME,
        s3_key = S3_NS_KEY,
        redshift_conn_id="rs_ops_prod",
        aws_conn_id="S3_Dev_Connection",
        select_query = get_ns_inv_data,
        unload_options= ['allowoverwrite']

    )

However when I run the dag, I get the following error:

   [2022-07-22, 15:55:38 UTC] {dbapi.py:213} INFO - Running statement: 
                    UNLOAD ('
    select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,
    sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,
    value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,
    reg.territory as ns_territory, reg.region as ns_region,
    bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name
    from netsuite.invoices inv
    left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id
    left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id
    left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id
    left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;')
                    TO 's3://data-warehousing-data-engineering/arr_poc/netsuite/transaction_file_raw.csv'
                    credentials
                    'aws_access_key_id=AKIA56JTTF6DBUI6N5NG;aws_secret_access_key=***'
                    allowoverwrite;
        , parameters: None
[2022-07-22, 15:55:38 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1648, in execute
    ps = cache["ps"][key]
KeyError: ("\n                    UNLOAD ('\n    select invoice_number, invoice_line_id, invoice_date,invoice_type, maintenance_start_date, maintenance_end_date, substring(sf_invoice,0,8) sf_invoice, sf_opportunity_no,\n    sf_order_no, currency, account_id, department_id, billtocust_id, reseller_id, enduser_id, product_hierarchy_id, sku, product_internal_id, unit_price, quantity_invoiced,\n    value_invoiced, value_invoiced_tx, cogs_amount, region_id, license_type,\n    reg.territory as ns_territory, reg.region as ns_region,\n    bill_entity.full_name as bill_to_full_name, reseller_entity.full_name as reseller_full_name, replace(end_user_entity.full_name, 'z','t') as end_user_full_name\n    from netsuite.invoices inv\n    left join netsuite.region reg on inv.region_id=reg.region_hierarchy_id\n    left join netsuite.entity bill_entity on inv.billtocust_id = bill_entity.entity_id --or inv.reseller_id = entity.entity_id --or inv.enduser_id = entity.entity_id\n    left join netsuite.entity reseller_entity on inv.reseller_id = reseller_entity.entity_id\n    left join netsuite.entity end_user_entity on inv.enduser_id = end_user_entity.entity_id;')\n                    TO 's3://data-warehousing-data-engineering/arr_poc/netsuite/transaction_file_raw.csv'\n                    credentials\n                    'aws_access_key_id=AKIA56JTTF6DBUI6N5NG;aws_secret_access_key=***'\n                    allowoverwrite;\n        ", ())

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/providers/amazon/aws/transfers/redshift_to_s3.py", line 156, in execute
    redshift_hook.run(unload_query, self.autocommit, parameters=self.parameters)
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 193, in run
    self._run_command(cur, sql_statement, parameters)
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 217, in _run_command
    cur.execute(sql_statement)
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/cursor.py", line 231, in execute
    self._c.execute(self, operation, args)
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1718, in execute
    self.handle_messages(cursor)
  File "/Users/tanmay/PythonProjects/apacheairflow_workspace/airflow_env/lib/python3.9/site-packages/redshift_connector/core.py", line 1986, in handle_messages
    raise self.error
redshift_connector.error.ProgrammingError: {'S': 'ERROR', 'C': '42601', 'M': 'syntax error at or near "z"', 'P': '649', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parser_scan.l', 'L': '714', 'R': 'yyerror'}
[2022-07-22, 15:55:38 UTC] {taskinstance.py:1395} INFO - Marking task as FAILED. dag_id=arr_poc, task_id=prod_rs_data_pull.ns_inv_to_s3, execution_date=20220722T195529, start_date=20220722T195536, end_date=20220722T195538
[2022-07-22, 15:55:38 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 753 for task prod_rs_data_pull.ns_inv_to_s3 ({'S': 'ERROR', 'C': '42601', 'M': 'syntax error at or near "z"', 'P': '649', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parser_scan.l', 'L': '714', 'R': 'yyerror'}; 89443)
[2022-07-22, 15:55:38 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-22, 15:55:38 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

It seems that airflow does not like '' in the replace statement. Has anyone run into this, if so how was this solved?

I am trying to replace DoubleQuotes with blank so that when the data is written to S3, there are no special characters in it.

Thanks

tkansara
  • 534
  • 1
  • 4
  • 21

0 Answers0