1

Hello similar to this answer. I would like to set a Task Instance note. enter image description here This is my attempt:

def ti_note():

    @task
    def set_my_note(**context):

        dag_id = context['dag'].dag_id
        task_id = context['task'].task_id
        execution_date = context['execution_date']

        message = f"Note for dag {dag_id}, task {task_id}, and execution date {execution_date}"

        session = settings.Session()
        task_instance = session.query(DagRun).filter(TaskInstance.dag_id == dag_id,
                                                     TaskInstance.task_id == task_id,
                                                     TaskInstance.execution_date == execution_date).one()

        task_instance.note = message
        session.add(task_instance)

        session.commit()
        session.close()

    set_my_note_task = set_my_note()

This is the error I get: sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one was required I understand i need to change .one() but I don't understand why multiple rows are returned from the query. Thanks

Justin
  • 67
  • 1
  • 5

1 Answers1

0

Heyo, just used your code as a starter and was able to figure this out.. so thank you! The issue you were seeing is that you were getting multiple rows returned because you need to query on all four fields in the composite key for a task instance (task_id, dag_id, run_id, and map_index)

task_instance_fields

Hope this code helps someone else out

    @task
    def set_my_note(**context):
        dag_id = context["dag"].dag_id
        task_id = context["task"].task_id
        run_id = context["run_id"]
        task_instance = context["task_instance"]

        map_index = task_instance.map_index

        message = f"Note for dag {dag_id}, task {task_id}, and map_index {map_index}"

        session = Session()
        task_instance = (
            session.query(TaskInstance)
            .filter(
                TaskInstance.dag_id == dag_id,
                TaskInstance.task_id == task_id,
                TaskInstance.run_id == run_id,
                TaskInstance.map_index == map_index,
            )
            .one()
        )

        task_instance.note = message
        session.add(task_instance)

        session.commit()
        session.close()
dukarc
  • 183
  • 1
  • 5