0

I have this Python file:

class Get:

    def __init__(self, i):
        self.i = get_date(i)
        self.df = self.get_file()

    def get_file(self):
        try:
            ...
            return df
        except Exception as e:
            return ...

    def get_date(self,i):
        dt = datetime.now() - timedelta(days=i)
        return dt.strftime("%Y-%m-%d")

    def put(self,df):
        ....


class Fix:
    def __init__(self,df):
        ....

if __name__ == '__main__':
    for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)

Basically this code generates 4 last dates and run the functions over these dates (update statistics etc...)

At first I wanted to convert each function into a PythonOperator and then schedule it but I don't think this will work. I don't know how to convert the Classes and the parameters that are transferred between them.

This is what the code does if I run it on 2018-Jun-12 and below what it should be with Airflow: enter image description here

Is there a template that I can use or any suggestion how to do it?

jack
  • 821
  • 5
  • 16
  • 28
  • Did you tried this https://stackoverflow.com/questions/41730297/python-script-scheduling-in-airflow – Yash Kumar Atri Jun 11 '18 at 08:47
  • @YashKumarAtri This explain how to use the python script and call functions in it. This is not what I want. I want to delete the python script and create it as Airflow DAGs. I don't understand how do I convert it. In the script I have a loop. in the Airflow I shouldn't have loop i should have each run should with different value of i – jack Jun 11 '18 at 08:51
  • 1
    I recommend working through the official tutorial https://airflow.apache.org/tutorial.html since there are many specific several parts here which would have to be explained. – tobi6 Jun 11 '18 at 12:40
  • Aside of that, you are right. Airflow will take over scheduling and you can put the code in the DAG - but you don't need to, you could call the same way as you are calling from main. – tobi6 Jun 11 '18 at 12:41

1 Answers1

5

you can either execute your script using BashOperator without any changes of your script:

dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily', 
default_args=default_args)

t1 = BashOperator(
    task_id = '{NAME_OF_TASK}',
    dag = dag,
    bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')

or use PythonOperator:

  1. update your code to create main function in your script:

    def main():
        for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
    
  2. define and execute the dag:

    dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily', 
    default_args=default_args)
    
    t1 = PythonOperator(
        task_id = '{NAME_OF_TASK}',
        dag = dag,
        python_callable = main)
    
David Lexa
  • 189
  • 2
  • 13