0

I'm new to Airflow. I want to add data from PostgresOperator to the table using xcom_pull, Here's how I do it:

 load_data = PostgresOperator(task_id="load_data",
                                postgres_conn_id="database_my",
                                 sql=[f"""INSERT INTO test VALUES
                                    ('{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id_district']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['coord']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['address']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['properties_json']}}}}'
                                    )
                                """ for i in range(0, 5)])

But there is such an error.

[2023-02-28, 20:03:48 UTC] {sql.py:375} INFO - Running statement: INSERT INTO test VALUES
                                    (1,
                                    3,
                                    '[20, 50]',
                                    'город ГОРОД',
                                    '{'number': {'title': 'TITLE1', 'value': 'VALUE1'}, 'name': {'title': 'TITLE2', 'value': 'VALUE2 "VALUE2"'}, 'activity': {'title': 'TITLE3', 'value': 'VALUE3'}}'
                                    )
                                , parameters: None
[2023-02-28, 20:03:48 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/operators/sql.py", line 260, in execute
    output = hook.run(
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 349, in run
    self._run_command(cur, sql_statement, parameters)
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 380, in _run_command
    cur.execute(sql_statement)
psycopg2.errors.SyntaxError: syntax error at or near "number"
LINE 5:                                     '{'number': {'title': 'T...
                                               ^

[2023-02-28, 20:03:49 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=data_579, task_id=load_data, execution_date=20230228T190000, start_date=20230228T200348, end_date=20230228T200349
[2023-02-28, 20:03:49 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 10632 for task load_data (syntax error at or near "number"
LINE 5:                                     '{'number': {'title': 'T...
                                               ^
; 2491897)
[2023-02-28, 20:03:49 UTC] {local_task_job.py:164} INFO - Task exited with return code 1

I am also interested in the question of how to find out the length of the data from xcom_pull. So that the loop is executed as many times as I have rows.

I understand that the point is in single quotes, but how to make double ones?

  • Where and how you are generating data that is pushed to the xcom? – Emma Mar 01 '23 at 15:48
  • You can convert it to JSON where you are pushing or you can try `tojson` filter in Jinja. `'{{{{ti.xcom_pull(...['properties_json'] | tojson }}}}'` I haven't tried though. https://jinja.palletsprojects.com/en/2.11.x/templates/#tojson – Emma Mar 01 '23 at 16:03
  • @Emma thank u. tojson worked! – Olga Fokina Mar 05 '23 at 23:19
  • @Emma A new problem has arisen. '{{ti.xcom_pull(key="transform1", task_ids=["transform_data"][0]) | length}}' returns a string. But I need a number. The typecast int() doesn't work. – Olga Fokina Mar 05 '23 at 23:20
  • where you are using that? for `range`? – Emma Mar 06 '23 at 15:25
  • @Emma yes, for range – Olga Fokina Mar 06 '23 at 21:09
  • I see. If you need dynamic `range`, take a look at dynamic task mapping feature which will simplify what you are trying to do. https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#task-generated-mapping In order to use this, you might need to have intermediate `PythonOperator` to construct query string then pass the constructed string to `PostgresOperator` through xcom. – Emma Mar 06 '23 at 21:21
  • @ Emma I will deal with it. Thank u so much! – Olga Fokina Mar 09 '23 at 18:32

1 Answers1

0

It is solved like this:

 load_data = PostgresOperator(task_id="load_data",
                            postgres_conn_id="database_my",
                             sql=[f"""INSERT INTO test VALUES
                                ('{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id']}}}}',
                                '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id_district']}}}}',
                                '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['coord']}}}}',
                                '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['address']}}}}',
                                '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['properties_json'] | tojson}}}}'
                                )
                            """ for i in range(0, 5)])