5

Code:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class GCSUploadOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        bucket_name,
        target_file_name,
        data_as_str,
        gcp_conn_id="google_cloud_default",
        *args,
        **kwargs,
    ):
        super(GCSUploadOperator, self).__init__(*args, **kwargs)
        self.bucket_name = bucket_name
        self.data_as_str = data_as_str
        self.gcp_conn_id = gcp_conn_id
        self.target_file_name = target_file_name

    def execute(self, context):
        hook = GCSHook(self.gcp_conn_id)
        hook.upload(
            bucket_name=self.bucket_name,
            object_name=context["execution_date"].strftime(
                f"year=2022/month=%m/day=%d/{self.target_file_name}"
            ),
            data=self.data_as_str,
        )

numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(
    task_id="upload_content_to_GCS",
    bucket_name=BUCKET_NAME,
    target_file_name=f"{STORE_KEY_CONTENT}.json",
    data_as_str=?????????,   # I need to pass a string result of previous task
)

What I've tried for data_as_str:

    gcs = GCSUploadOperator(
        task_id="upload_content_to_GCS",
        bucket_name=BUCKET_NAME,
        target_file_name=f"{STORE_KEY_CONTENT}.json",
        data_as_str=numbers
    )
    --> TypeError: <Task(PythonOperator): numbers> could not be converted to bytes

    gcs = GCSUploadOperator(
        task_id="upload_content_to_GCS",
        bucket_name=BUCKET_NAME,
        target_file_name=f"{STORE_KEY_CONTENT}.json",
        data_as_str=numbers.output
    )
    --> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes

Any idea?

user3595632
  • 5,380
  • 10
  • 55
  • 111

1 Answers1

5

To make it work, you have to define the field you are expecting in your Operator as a template_field. I made this working example:


class CustomDummyOperator(BaseOperator):
    template_fields = ('msg_from_previous_task',)

    @apply_defaults
    def __init__(self,
                 msg_from_previous_task,
                 *args, **kwargs) -> None:
        super(CustomDummyOperator, self).__init__(*args, **kwargs)
        self.msg_from_previous_task = msg_from_previous_task

    def execute(self, context):
        print(f"Message: {self.msg_from_previous_task}")

DAG:

dag = DAG(
    'xcom_arg_custom_op',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
    catchup=False

)

def return_a_str():
    return "string_value_from_op1"

task_1 = PythonOperator(
    task_id='task_1',
    dag=dag,
    python_callable=return_a_str,
)

task_2 = CustomDummyOperator(
    task_id='task_2',
    dag=dag,
    msg_from_previous_task=task_1.output
)

task_1 >> task_2

Output log:

[2021-05-25 13:51:50,848] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_arg_custom_op
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-05-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-05-23T00:00:00+00:00
Message: string_value_from_op1

Under the hood we are using the str() method of XComArg which provides backward compatibility for regular ("none-Taskflow") Operators.

Let me know if that worked for you!

NicoE
  • 4,373
  • 3
  • 18
  • 33
  • 1
    I was got to know it and about to write a solution but you did it before me! Thanks! – user3595632 May 26 '21 at 03:51
  • So you just need to declare it as template_field, no need to do the usual `msg_from_previous_task = "{}".format(self.msg_from_previous_task)` ? – Thomas J May 25 '22 at 13:43