0

I have a scenario where i want to process csv file and load to someother database:

Cases

  1. pic csv file and load to mysql with the same name as csv
  2. then do some modification on loaded rows using python task file
  3. after that extract data from mysql and load to some other database

CSV files are coming from remote server to one airflow server in a folder.

We have to pick these csv file and process through python script.

Suppose i pick one csv file then i need to pass this csv file to rest of the operator in a dependency manner like

filename : abc.csv

task1 >> task2 >> task3 >>task4

So abc.csv should be available for all the task.

Please tell how to proceed.

tobi6
  • 8,033
  • 6
  • 26
  • 41
kamal kishore
  • 84
  • 1
  • 8
  • I suggest to change the question's title to: "How to share file between Airflow tasks" or something similar. – Ena Dec 06 '18 at 14:28

2 Answers2

1

Your scenarios don't have anything to do with realtime. This is ingesting on a schedule/interval. Or perhaps you could use a SensorTask Operator t detect data availability.

Implement each of your requirements as functions and call them from operator instances. Add the operators to a DAG with a schedule appropriate for your incoming feed.

How you pass and access params is -kwargs python_callable when initing an operator -context['param_key'] in execute method when extending an operator -jinja templates

relevant... airflow pass parameter from cli execution_date in airflow: need to access as a variable

0

The way tasks communicate in Airflow is using XCOM, but it is meant for small values, not for file content.

If you want your tasks to work with the same csv file you should save it on some location and then pass in the XCOM the path to this location.

We are using the LocalExecutor, so the local file system is fine for us.

We decided to create a folder for each dag with the name of the dag. Inside that folder we generate a folder for each execution date (we do this in the first task, that we always call start_task). Then we pass the path of this folder to the subsequent tasks via Xcom.

Example code for the start_task:

def start(share_path, **context):
    execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)    
    execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
    _create_folder_delete_if_exists(execution_folder_path)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)

start_task = PythonOperator(
    task_id='start_task',
    provide_context=True,
    python_callable=start,
    op_args=[share_path],
    dag=dag
)

The share_path is the base directory for all dags, we keep it in the Airflow variables.

Subsequent tasks can get the execution folder with:

execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')
Ena
  • 3,481
  • 36
  • 34