24

I need to

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

I am unsure about the way I should proceed. Is xcom the way to go here? Also, MYSQLOperator only executes the query, doesn't fetch the records. Is there any inbuilt transfer operator I can use? How can I use a MYSQL hook here?

you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.

Can someone explain how to proceed regarding the same.

Refer - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

Is this the correct way to proceed? Also how do we use xcoms to store the records for the following MySqlOperator?'

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
gpk27
  • 809
  • 2
  • 8
  • 19
  • 1
    Instead of using MySqlHook, since you want to get a lot of data which doesn't seem to fit to XCOM, you can create your own function with PythonOperator and then maybe have `sqlalchemy` to help you with sql connection and query? – Chengzhi Sep 22 '17 at 21:40
  • 1
    @Chengzhi that is what I am doing currently but it defeats the purpose of using Airflow then. Is there any other workaround? – gpk27 Oct 10 '17 at 09:37
  • I don't think this defeats the purpose of using airflow. The operators operator on things (MySQL operator operates on MySQL databases). If you want to operator on each record from a database with Python, it only make sense you'd need to use the `PythonOperator`. I wouldn't be afraid of crafting large Python scripts that use low-level packages like sqlalchemy. Sometimes that's just the best thing to do if that's the task you need to do. I think your problem is you're trying to make two tasks out of a something that's really one task. – Mike Oct 15 '17 at 03:23
  • What do you do with the records after processing with Python? Do you write them to some other place? There's a lot of community contributed operators and many of them are a combination of multiple hooks with some processing in between with Pandas or similar, e.g. MySQL to GCS for example. There's lots in the core operators directory https://github.com/apache/incubator-airflow/tree/master/airflow/operators and also don't forget the ones in the contrib directory. Plagiarise and mash them together and add to your plugins if what you need doesn't exist. – Davos Nov 11 '17 at 05:58
  • I would definitely start with the MySQL hook, because then you can use airflow's ability to store and retrieve encrypted connection strings amongst other things. Sure you can write the database connection and handlng directly in SQLAlchemy but the abstraction layer already exists so why not use it.You'll find methods in there that already read to in memory dataframes. The parent class has a get pandas df: https://github.com/apache/incubator-airflow/blob/faa9a5266c0b2e68693dd106b5cb46d30770dadc/airflow/hooks/dbapi_hook.py#L76 – Davos Nov 11 '17 at 06:07

2 Answers2

3

I was really struggling with this for the past 90 minutes, here is a more declarative way to follow for newcomers:

from airflow.hooks.mysql_hook import MySqlHook

def fetch_records():
  request = "SELECT * FROM your_table"
  mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  sources = cursor.fetchall()
  print(sources)

...your DAG() as dag: code

task = PythonOperator(
  task_id = 'fetch_records',
  python_callable = fetch_records
)

This returns to the logs the contents of your DB query.

I hope this is of use to someone else.

dimButTries
  • 661
  • 7
  • 15
0

Sure, just create a hook or operator and call the get_records() method: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html

Thomas Dickson
  • 313
  • 3
  • 9
Breathe
  • 714
  • 5
  • 21