Basically I'm working with airflow and developed a task that my download a file from an external source.
t1 = PythonOperator(
task_id='download',
python_callable=download,
provide_context=True,
dag=dag)
and this airflow is running in a virtual environment (pipenv).
The download function is:
def download(**kwargs):
folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
file_name = download_file(folder_id)
return file_name
so basically I'm using Xcons to pass data from one task to another...and using this configurations it's impossible to manage all of dependencies of each DAG...
In the documentation I found this class called "PythonVirtualenvOperator", so to implement that I wrote :
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=download,
requirements=['requests'],
python_version='3.8',
provide_context=True,
dag=dag
)
and its giving me the following error:
TypeError: can't pickle module objects
the download_file function it's an API connection that is in another file.
any suggestion how can I manage the environment and have connection between tasks?