1

I am currently creating an engine in my DAG and passing this sqlalchemy engine as a parameter to PythonOperator to execute some database work. e.g.

PythonOperator(python_callable=my_callable,
                               op_args=[engine],
                               provide_context=True,
                               task_id = 'my_task',
                               dag=dag)

When I try to clear the status of tasks, I get an error

File "/opt/conda/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread._local objects

This is most likely because you can't pickle engine objects:

pickle.dumps(engine)
TypeError: can't pickle _thread._local objects

I'm wondering if there's a good way to get around this so I can use the Airflow webserver effectively. I need to pass the python callable something that would allow it to interact with the database, and it could be the connection string, but it is easier to make the engine once in the DAG and pass it to all of the Operators than make the engine in each one.

nven
  • 1,047
  • 4
  • 13
  • 22
  • Your task should create the database connection, don't pass it as an argument – miraculixx Aug 06 '19 at 20:51
  • Thanks @miraculixx, is there a particular reason (other than the pickle concern) why this would be beneficial as opposed to passing the engine directly? – nven Aug 06 '19 at 23:07
  • Yes, a database connection by its very nature is ephemeral and only exists for the time used by a particular process. Airflow tasks are instantiated at the time of execution (which may be much later, repeatedly), in a different process, possibly on a different machine. Hence even if you could pickle the connection it would not be of use to the task when it is run as it most likely would have seized to exist anyway. In general and as a matter of principle, not only in Airflow, connections should always be created, managed and closed by the same process. – miraculixx Aug 07 '19 at 13:17

1 Answers1

4

if there's a good way to get around this so I can use the Airflow webserver effectively. I need to pass the python callable something that would allow it to interact with the database

Passing the connection string is a possibility, however it should not include the credentials (user id, password), as you don't want credentials to be stored in plain format. Airflow provides two concepts, Variables and Connections for this purpose, see this answer for details.

it is easier to make the engine once in the DAG and pass it to all of the Operators than make the engine in each one

Actually - no. It may seem easiser at first glance, however it is a bad idea on closer examination.

A database connection by its very nature is ephemeral and only exists for the time used by a particular process. Airflow tasks are instantiated at the time of execution (which may be much later, repeatedly), in a different process, possibly on a different machine. Hence even if you could pickle the connection it would not be of use to the task when it is run as it most likely would have seized to exist anyway.

In general and as a matter of principle, not only in Airflow, connections should always be created, managed and closed by the same process.

miraculixx
  • 10,034
  • 2
  • 41
  • 60