0

Need some help. I'm trying to query data from oracle database. and also new to airflow as well.
My code looks like this:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.oracle.operators.oracle import OracleOperator

default_args = {
    'owner': 'owner_name',
    'retries': 5,
    'retry_delay': timedelta(minutes=10)
}

with DAG(
    'dag_name',
    default_args=default_args,
    schedule_interval='0 1 * * *',
    start_date=datetime(2023, 4, 10),
) as dag:
    task = OracleOperator(
        task_id='task_name',
        oracle_conn_id='connection_name',
        sql="""
            select
                some_datetime_field,
                some_data_field
            from 
                some_table
        """
    )
    task

But I get error:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 2305, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 240, in set
    map_index=map_index,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'datetime.datetime' object has no attribute '__module__'

After I delete line some_datetime_field. It works fine. So, I'm not sure what's wrong. and how can i pass SQL datetime via XCOM to another task?

1 Answers1

0

Can you try converting the date field value to a string in your query? https://www.oracletutorial.com/oracle-date-functions/oracle-to_char/

The object that is currently getting returned may not be JSON serialisable which is a requirement for pushing to XCOM.

Additional reference: https://github.com/apache/airflow/discussions/24881

Pankaj Koti
  • 66
  • 1
  • 5
  • If I want to use the datetime field in the downstream task like "task1 >> task2 >> task3". I have to converting back and forth between char and datetime before returning, right? – Phudit Chalekarn Apr 19 '23 at 00:55
  • yes, unfortunately, needs to be taken care of. Alternatively, you could also extend the BaseXCOM and write a class with a custom serializer-deserializer for datetime values e.g. can be seen how we serde for datetime in the module https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/amazon/aws/xcom_backends/s3.py and use that class as your XCOM backend by setting an env variable for your deployment like this: https://github.com/astronomer/astronomer-providers/blob/main/dev/docker-compose.yaml#L30 – Pankaj Koti Apr 19 '23 at 04:29