0

I want to create task which will be update columns rows and send mail for every line in data table. At the moment I create task which download the data from main table. I cannot create tasks for every line in temp data table. Could you tell what I doing wrong and how I can generate and run tasks in lopp?

from datetime import datetime, timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

from airflow.contrib.operators.bigquery_get_data import BigQueryGetDataOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryValueCheckOperator
from airflow.operators import PythonOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'cmap',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}


with DAG('dq_bigquery_test',
         max_active_runs=1,
         schedule_interval='@once',
         catchup=False,
         default_args=default_args) as dag:

    query = "SELECT * from `dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc` where MailRequired = false"
    insert = "INSERT into dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc (DataTimeStamp, Robot, Status) Values (CURRENT_TIMESTAMP(), 'TestRobot', 'Test')"

    my_bq_task = BigQueryOperator(
                    task_id='query_exc_on_teste',
                    sql=query,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    bigquery_conn_id='google_cloud_dbce_bi_prod',
                    use_legacy_sql=False,
                    destination_dataset_table='dev_dataquality.testTable')



    get_data = BigQueryGetDataOperator(
        task_id='get_data_from_query',
        project_id='dbce-bi-prod-e6fd',
        dataset_id='dev_dataquality',
        table_id='testTable',
        max_results='100',
        selected_fields='Robot,Status,MailRequired',
        bigquery_conn_id='google_cloud_dbce_bi_prod'
        )

    def process_data_from_bq(**kwargs):


        ti = kwargs['ti']
        update_column = []
        bq_data = ti.xcom_pull(task_ids='get_data_from_query')
        print(bq_data)
        # Now bq_data here would have your data in Python list
        for index, i in enumerate(bq_data):


            update_query = "UPDATE `dbce-bi-prod-e6fd.dev_dataquality.data_logging_inc` SET MailSent = True WHERE Robot = '{}'".format(i[0])

            print(update_query)
            update_column.append(BigQueryOperator(
                    task_id='update_column_{}'.format(index),
                    sql=update_query,
                    write_disposition='WRITE_EMPTY',
                    create_disposition='CREATE_IF_NEEDED',
                    bigquery_conn_id='google_cloud_dbce_bi_prod',
                    use_legacy_sql=False,
                    dag=dag
                    ))
            if index not in [0]:
                update_column[index-1] >> update_column[index]                    


    process_data = PythonOperator(
        task_id='process_data_from_bq',
        python_callable=process_data_from_bq,
        provide_context=True
        )



    my_bq_task >> get_data >> process_data

Thank you for your help!

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Kagemar
  • 1
  • 1
  • 1
    At least 2 things wrong **[1]** the `python_callable` of your `process_data` task is itself generating `task`s; it doesn't work that way (a task cannot generate tasks; tasks must be generated at top-level, alongside `DAG` object creation) **[2]** generating tasks based on `XCOM` is **not possible** (because `XCOM` is materialized at *DAG runtime* and not *DAG generation / parsing*) [link1](https://stackoverflow.com/a/44638526/3679900), [link2](https://stackoverflow.com/a/56556790/3679900) – y2k-shubham May 11 '20 at 03:27
  • Okay, so I should create task outside of process_data task it is clear for me. How I can parse data from task and use list in creating task outside of prcess_data task? – Kagemar May 11 '20 at 09:07
  • Are you running your code inside Cloud Composer? – aga May 11 '20 at 14:31
  • Yes, I run every dag via Cloud Composer. – Kagemar May 11 '20 at 15:31
  • **@Kagemar** do understand that "you can't generate downstream tasks based on output of an upstream tasks" (there's a catch) Read these for clarification [link1](https://stackoverflow.com/a/48992229/3679900) [link2](https://stackoverflow.com/a/57465332/3679900) [link3](https://stackoverflow.com/a/54586536/3679900). But if you are feeling adventurous, have a look at [this](https://stackoverflow.com/a/55896121/3679900) *[i strongly suggest you refrain from using this unless youv'e developed a fair bit understanding of airflow]* – y2k-shubham May 11 '20 at 16:52
  • @Kagemar are the links provided by y2k-shubham are sufficient for your needs? – aga May 13 '20 at 17:54
  • @muscat to be honest - nope. Is there any way to create dynamically tasks in airflow? – Kagemar May 14 '20 at 22:07
  • Hello, this question is dedicated to Apache Airflow developers, I recommend you to ask it on official Apache community forum: [1](https://gitter.im/apache/incubator-airflow?at=59e5bd23f7299e8f53f8d667), [2](https://apache-airflow-slack.herokuapp.com/) – aga May 18 '20 at 06:42
  • **@Kagemar** have a look at [this](https://stackoverflow.com/a/54746434/3679900). It will hopefully let you understand why the thing you are asking for is not possible (or more correctly should not be done). But if you are adamant, [this](https://stackoverflow.com/a/55896121/3679900) trick can be used. – y2k-shubham May 29 '20 at 16:59
  • Also go through [this](https://stackoverflow.com/a/55132959/3679900) and [this](https://stackoverflow.com/a/54746434/3679900) – y2k-shubham Jun 12 '20 at 18:29

0 Answers0