i am looking to develop a workflow wherein my dag kick-starts a process on remote server and monitor if the each task in process is succeeded or not, it should read the status from mongo-db and if the task is succeeded then the next task is to be triggered. is there any way i can achieve it? i think i should use a mongo_sensor but not sure how to use that.
i have successfully read the mongodb using this code.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pendulum
local_tz = pendulum.timezone("Europe/Amsterdam")
def function1():
print("hello")
import pymongo
from pymongo import MongoClient
client=MongoClient("mongodb://rpa_task:rpa_task123@ds141641.mlab.com:41641/rpa_task")
mydb = client['rpa_task']
collect2 = mydb['business_process_mgts']
cursor=collect2.find({"process.id":"ross1335_testingpurchase_1915"})
for i in cursor:
print(i['sequenceFlow'])
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 07, 8, tzinfo=local_tz),
'email': ['shubhamkalyankari01@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 3,
'schedule_interval': '@hourly',
'retry_delay': timedelta(seconds=5),
}
dag = DAG('mongo1.py', default_args=default_args)
t1=PythonOperator(dag=dag,
task_id='t1',
provide_context=False,
python_callable=function1,)
it is reading the mongo documents successfully.