1

Psuedo python code.

def update_tags(product):
    update-tag-based-on-stuff

def products(p,h,**kwargs)
   // Get Products
   for product in products:
       update_tags(product)

with models.DAG(
    "fetchProducts",
    default_args=default_args,
    schedule_interval=None
    ) as dag:

    get_products = PythonOperator(
        task_id = 'get_products',
        python_callable = products,
        op_kwargs = {'p':params, 'h':headers},
        provide_context=True
        );


get_products >> ??

What I am trying to achieve.

When the products loop is triggered, I want it to spawn a task (update_tags) in AirFlow and then proceed to the next loop, i.e. Don't wait for the task to finish.

My airflow.cfg is using localExecutor.

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Keval Domadia
  • 4,768
  • 1
  • 37
  • 64
  • see [this](https://stackoverflow.com/a/52558172/3679900) and then for further reading you can do a [quick search](https://www.google.com/search?q=airflow+generate+tasks+dynamically) – y2k-shubham Aug 25 '20 at 06:21
  • Thank you @y2k-shubham . Have stumbled upon this earler. This logic did not work in my case. That solution is ideal when you know the number of tasks to spawn, the logic I am expect relies on the number of products it fetches (dynamic). – Keval Domadia Aug 25 '20 at 11:17
  • You can't have a dynamic number of tasks -- you _could_ have the `PythonOperator` function fire off processes via `multiprocessing` or something similar, but then you're outside of Airflow's control/logging. – joebeeson Aug 25 '20 at 13:17

0 Answers0