2

I'm trying to customize the Airflow BashOperator, but it doesn't work. So far i have tried this

my_operators.py

from airflow.utils.decorators import apply_defaults
from airflow.operators.bash_operator import BashOperator

class MyCopyOperator(BashOperator):

    template_fields = ('bash_command', 'source_file', 'source_dir', 'target_file', 'target_dir')


    @apply_defaults
    def __init__(
            self,
            bash_command,
            source_file, 
            source_dir, 
            target_file, 
            target_dir,
            *args, **kwargs):

        super(MyCopyOperator, self).__init__(*args, **kwargs)
        self.bash_command = bash_command
        self.source_file = source_file
        self.source_dir = source_dir
        self.target_file = target_file
        self.target_dir = target_dir     

    def execute(self, context):


        self.bash_command =  "cp " + " " + self.source_dir + "/" + self.source_file + " " + self.target_dir + "/" + self.target_file
        super().bash_command =  self.bash_command
        print(super.bash_command)
        print(F"inherited {self.bash_command}")
        super().execute(self,context)

operator_example.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.exceptions import AirflowException
from my_operators import MyCopyOperator

dag_name= 'my_test_dag'

owner = Variable.get("owner_" + dag_name)


default_args = {
    "owner": owner,
    "depends_on_past": False,
    "start_date": datetime(2019, 10, 31),
    'email': ['airflow@example.com'],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=None)


copytest_task=MyCopyOperator(
     task_id='copytest_task',
     bash_command="cp",
     source_file="test_file.txt",
     source_dir='/usr/local/airflow',
     target_file="test_file.copied.txt",
     target_dir='/usr/local/airflow',
     dag=dag,
     provide_context=True,
)


copytest_task

The Airflow GUI shows the error message Broken DAG: [/usr/local/airflow/dags/operator_example.py] Argument ['bash_command'] is required

What's wrong with this attempt? I know that I could duplicate or imitate the implementation of BashOperator from https://github.com/apache/airflow/blob/master/airflow/operators/bash_operator.py but that's not what I want.

1 Answers1

2

Use the following Operator. Notice how we pass bash_command to the class we inherit from.

from airflow.utils.decorators import apply_defaults
from airflow.operators.bash_operator import BashOperator

class MyCopyOperator(BashOperator):

    template_fields = ('bash_command', 'source_file', 'source_dir', 'target_file', 'target_dir')


    @apply_defaults
    def __init__(
            self,
            source_file, 
            source_dir, 
            target_file, 
            target_dir,
            *args, **kwargs):

        super(MyCopyOperator, self).__init__(bash_command="cp " + " " + source_dir + "/" + source_file + " " + target_dir + "/" + target_file, *args, **kwargs)
        self.source_file = source_file
        self.source_dir = source_dir
        self.target_file = target_file
        self.target_dir = target_dir     

Example Task:

MyCopyOperator(
    task_id='print_date12',
    source_file='test.txt',
    source_dir='/Users/kaxilnaik/Desktop',
    target_file="test1.txt",
    target_dir="/Users/kaxilnaik/Desktop/abc",
    dag=dag)
kaxil
  • 17,706
  • 2
  • 59
  • 78
  • Thanks a lot, it works, but when running the DAG I get the error message _cp: missing file operand_ whereas a BashOperator with the same bash_command works like a charm `bash_test=BashOperator( task_id='bash_test', bash_command='cp /usr/local/airflow/test_file.txt /usr/local/airflow/test_file.copied.txt', dag=dag, )` – MissouriMystery Nov 21 '19 at 07:19
  • I have updated the answer, please use that and check. I have also added an example Task that worked for me. – kaxil Nov 21 '19 at 11:18
  • Now it worked. Thanks a lot. I used _MyCopyOperator( task_id='print_date12', bash_command="############", source_file='test.txt', source_dir='/usr/local/airflow', target_file="test1.txt", target_dir="/usr/local/airflow", dag=dag)_ – MissouriMystery Nov 21 '19 at 12:06
  • Np :). I have edited the answer once again. With the new code you don't need to enter `bash_command="############"` :) – kaxil Nov 21 '19 at 12:26
  • This will fail in obnoxious ways if any of the parameters contain quoting characters, wildcards, whitespace, or etc. A much better, but more involved, solution would avoid calling a shell entirely for this simple operation, and just call `cp` directly with `subprocess.run()`. Maybe see also https://stackoverflow.com/questions/4256107/running-bash-commands-in-python/51950538#51950538 – tripleee Nov 21 '19 at 12:40