Code:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class GCSUploadOperator(BaseOperator):
@apply_defaults
def __init__(
self,
bucket_name,
target_file_name,
data_as_str,
gcp_conn_id="google_cloud_default",
*args,
**kwargs,
):
super(GCSUploadOperator, self).__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.data_as_str = data_as_str
self.gcp_conn_id = gcp_conn_id
self.target_file_name = target_file_name
def execute(self, context):
hook = GCSHook(self.gcp_conn_id)
hook.upload(
bucket_name=self.bucket_name,
object_name=context["execution_date"].strftime(
f"year=2022/month=%m/day=%d/{self.target_file_name}"
),
data=self.data_as_str,
)
numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=?????????, # I need to pass a string result of previous task
)
What I've tried for data_as_str
:
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=numbers
)
--> TypeError: <Task(PythonOperator): numbers> could not be converted to bytes
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=numbers.output
)
--> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes
Any idea?