I am trying to set up an AWS EMR process in Airflow and I need the job_flow_overrides
in the EmrCreateJobFlowOperator
and the steps
in the EmrAddStepsOperator
to be set by separate JSON files located elsewhere.
I have tried numerous ways both of linking the JSON files directly and of setting and getting Airflow Variables for the JSON. If I were to use the Airflow Variables, they would also need to be dynamically named, which I am having trouble with. I am able to easily Variable.set
a dynamic name using a PythonOperator, but cannot Variable.get
a dynamic name in the job_flow_overrides
or steps
because of Airflow's limitations of writing Python code outside of a PythonOperator.
The Airflow Variables have already been set earlier in the code, the following is my code trying to use the JSON data and set up the cluster
def get_global_json_contents():
return json.dumps(requests.get("PATH/TO/JSON/FILE").json())
# Use the 'Name' Key in this JSON as a specific identifier for the Variables created by this job
def get_global_json_name():
return json.loads(get_global_json_contents())['Name']
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
emr_conn_id='emr_default',
job_flow_overrides=json.loads(Variable.get("CLUSTER_SETUP-"+get_global_json_name())),
dag=dag
)
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=json.loads(Variable.get("RUN_STEPS-"+get_global_json_name())),
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
Does anyone know how I can get this process to work?
Does anyone have an idea how to get around the limitations of not being able to use Python functions in Airflow outside of PythonOperator
and python_callable
?
Might it be possible to solve this by defining functions in a separate Python file located elsewhere and importing it to Airflow? And if so, how would I go about doing that?