Questions tagged [airflow-xcom]

21 questions
1
vote
1 answer

Use airflow for 2 task and i have a problem for run

i would like use airflow for 2 tasks : The first is a request in openweathermap and i created json every 1 mn file and the second use the 20 last files and create un new file with : temperature key-value, city key-value and pression key-value. I…
0
votes
0 answers

Parse xcom in local function in airflow

I'm running an Airflow DAG with tasks executed via DockerOperator. I need to fetch the result from first task in the second. This can be done with xcoms in a following way: set first tasks 'xcom_all=True' print output from first task fetch xcom…
kakk11
  • 898
  • 8
  • 21
0
votes
1 answer

Passing values between tasks in Airflow (in operators other than PythonOperator

I am trying to see how to pass values between tasks in Airflow. There are multiple blogs and question in SO, but most of then illustrate this using PythonOperator, where ti.xcom_push and ti.xcom_push are used within the python callable. How can we…
Chittu
  • 3
  • 2
0
votes
0 answers

Is there a way to pass variable from SparkSubmitOperator to other operators?

As for as i figured out, x_com push needs to execute function which requires context parameters. However, with sparksubmitoperator, i can only execute python script file, which i cannot pass context parameter. How can I pass variable from…
sko1324
  • 3
  • 2
0
votes
1 answer

AWS SageMaker pipeline xcom alternative

I'm currently working with SageMaker Pipelines and have a question about inter-step communication. In systems like Apache Airflow, there's a concept of XCom, which allows tasks to exchange small data artifacts or messages. I'm wondering if there's…
Shlomi Schwartz
  • 8,693
  • 29
  • 109
  • 186
0
votes
1 answer

How to get an arbitrary file SHA into Airflow's GithubOperator?

It seems like this should be a fairly standard use case for GithubOperator, so I must be missing something obvious. Surely this can't be difficult to do! High level task: Using Airflow, orchestrate the update of contents of a certain arbitrary file…
Philip
  • 323
  • 3
  • 13
0
votes
0 answers

Why can't I pull an XComs value to be used as an argument in the Airflow GCSToGCSOperator?

I'm trying to pass a list of filenames from the first task in my DAG 'task_get_file_list_click' (task ID 'get_file_list_click') into the 'GCSToGCSOperator' of the second task 'task_copy_file_list_click' using 'xcom_pull' and the previous task ID. I…
rjk90
  • 3
  • 2
0
votes
0 answers

Airflow. Pass the result of the previous Task Flow to the next Task Flow

I want logging in the POSTGRES-logging-tables. And to load data from the MSSQL-table to the POSTGRES-table. After inserting a row into the POSTGRES-logging-tables, I get the ID - log.t_date.i_dt_log_key I need to add log.t_date.i_dt_log_key as an…
yagrus2
  • 1
  • 2
0
votes
1 answer

Airflow Kubernetes Pod Operator XCom return value is "None" when accessed in pythonOperator

I am running a kubernetes pod using airflow KubernetesPodOperator. Then executing a jar file in the pod and writing the output to the /airflow/xcom/return.json. When checking the task's XCom value it is showing the return_value as the the content of…
Sudiv
  • 17
  • 1
  • 5
0
votes
2 answers

json.decoder.JSONDecodeError when getting string from XCOM

In a task, I serialise a dict (converting a dict to string) and I pushed it to XCOM result[data] = json.dumps({"agents": ["john.doe@example.com"], "houses": ["jane.doe@example.com"]}) In Airflow's UI it looks good as a string, and the DAG level, I…
0
votes
1 answer

Airflow XCOM got error when query SQL Datetime field

Need some help. I'm trying to query data from oracle database. and also new to airflow as well. My code looks like this: from datetime import datetime, timedelta from airflow import DAG from airflow.providers.oracle.operators.oracle import…
0
votes
1 answer

Airflow render_template_as_native_obj still returns a string for dynamic tasks, expecting a dictionary

I am using airflow version 2.4.3 composer version 2.1.11 # this function creates a dictionary of sql file names and file paths from google cloud def _file_dict(*op_args): bucket=op_args[0] path=op_args[1] from google.cloud import…
0
votes
0 answers

Unable to push records from Task 1 to Task 2 in Airflow

I am trying to create a DAG in Apache Airflow that performs two tasks: Task 1 pulls data from a Google Sheet, cleans the data, and returns a pandas DataFrame. Task 2 takes this DataFrame and pushes the data to a BigQuery table. I have set up the…
0
votes
2 answers

How to get an Xcom value from a previous task

I am trying to get the Xcom value from a Sensor task. This is the result of the Sensor task: Now, I want to retrieve the string 'df-00...' si I can use it in the InputFilePattern parameter as you can see in the following task: …
0
votes
1 answer

How to add rows to a table using xcom_pull in PostgresOperator AIRFLOW

I'm new to Airflow. I want to add data from PostgresOperator to the table using xcom_pull, Here's how I do it: load_data = PostgresOperator(task_id="load_data", postgres_conn_id="database_my", …
1
2