0

I'm running an Airflow DAG with tasks executed via DockerOperator. I need to fetch the result from first task in the second. This can be done with xcoms in a following way:

  1. set first tasks 'xcom_all=True'

  2. print output from first task

  3. fetch xcom in the environment field of second task, something like:

    environment={'xcom_from_previous':'{{ task_instance.xcom_pull("first_task") }}'}

  4. Get the value in second task via os.environ.

Now, all is fine but my xcom output seems to be too large, so that what I read in the second task gets truncated, and is not useable, of course.

So, I could parse the '{{ task_instance.xcom_pull("first_task") }}' part before creating the environment, but how do I create a local function that actually can parse the template? Because, the field 'environment' in dockeroperator is templated, but regular functions in the DAG are not?

Currently, unfortunately forced to use airflow v 2.2.5 w

Hope my problem is clear.


Update:

If one needs to parse xcom or any template data, use user_defined_filters. But in current case, it was not actually needed. The problem was in simple de-serialization of string and looking at data in the airflow log.

kakk11
  • 898
  • 8
  • 21
  • Hiw much big is the xcom? Maybe better to save it to a file and in the xcom pass the path. If you can share code and xcom example it would be great – ozs Aug 24 '23 at 12:07
  • Thanks @ozs I'm actually testing the `user_defined_filters` atm and it looks it can do the job. I'll update when I'm finished. Size of xcom seems approx 32K letters. – kakk11 Aug 24 '23 at 12:40
  • The limit is according your db. SQLite: 2 GB, Postgres: 1 GB, MySQL: 64 KB – ozs Aug 24 '23 at 12:50
  • Thanks for thinking along, @ozs . Turned out that nothing is wrong with xcom nor env variables, these work fine, just a logical error with de-serializing strings back to data. Truncation happened on the airflow printing side. So case closed. – kakk11 Aug 24 '23 at 15:29

0 Answers0