5

I am trying to pass a list of strings from one task to another one via XCom but I do not seem to manage to get the pushed list interpreted back as a list.

For example, when I do this in some function blah that is run in a ShortCircuitOperator:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

and then I want to use such list as a parameter of an operator. For example,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

I expect input_paths to be equal to paths but it does not because the rendering happens firs and then assignment, and somewhat the template rendering converts the xcom_pull return to a stringified list (and thereafter my AfterBlahOperator inserts assigns that as the value of an element in a JSON.

I tried concatenating the paths into one string separated by some separator and pushing that to the XCom and then splitting that back when pulling from the XCom but as the XCom gets rendered first, I get, either that stringified list when the split function is called inside the template or the original concatenated string of paths if the split function is applied to the parameter (as in "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

XCom seems to work great for single values as task parameters or multiple values when the extracted values can be further processed but not for multiple_values to convert into 'one' as parameter of a task.

Is there a way to do this without having to write an extra function that precisely returns such list of strings? Or maybe I am abusing XCom too much, but there are many operators in Airflow that take a list of elements as parameter (e.g., usually the full path to multiple files that are the result of some previous task, hence not known beforehand).

Guille
  • 757
  • 2
  • 8
  • 24

3 Answers3

8

Jinja renders strings, so if you fetch an XCom via templates, it's always going to be a string. Instead, you will need to fetch the XCom where you have access to the TaskInstance object. Something like this:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

This is similar to how you would fetch it within a PythonOperator, which the XCom docs provide an example of.

Note that you can still support a separate input_paths parameter for when it can be hardcoded in a DAG, you'll just need an extra check to see which param to read the value from.

Daniel Huang
  • 6,238
  • 34
  • 33
  • Thanks! I was hoping for too much then! But yes, a custom operator with the 'operation' itself is the way to go. I just wanted to avoid the extra code but probably this is a trade off due to using Jinja. – Guille Nov 30 '17 at 09:49
1

Call eval(input_paths) inside AfterBlahOperator's execute method. This way, stringified list can be converted back to list

class AfterBlahOperator(BaseOperator):
template_fields = (input_paths)

def __init__(self, ..., *args, **kwargs):
    ...


def execute(self, context):
    paths = eval(input_paths) 
    for path in paths:
        ...
Vykunta
  • 760
  • 9
  • 14
0

Based on Daniel Hung's Answer, I made a quick factory that allows to "unpack" Jinja-Templated strings into a list for arbitrary operators with minimal Boilerplate-Code (so you don't need to create a subclass for every single operator):

def operator_unpackjinja_factory(baseclass):
    class ChildUnpackJinja(baseclass):
        def __init__(self, xcompull_taskid, *args, xcompull_key="return_value", xcompull_attrname="objects", **kwargs):
            self.xcompull_taskid = xcompull_taskid
            self.xcompull_key = xcompull_key
            self.xcompull_attrname = xcompull_attrname
            self.baseclass = baseclass
            assert xcompull_attrname not in kwargs
            kwargs[xcompull_attrname] = "EMPTY"
            super(ChildUnpackJinja, self).__init__(*args, **kwargs)

        def execute(self, context):
            objects = context['ti'].xcom_pull(task_ids=self.xcompull_taskid, key=self.xcompull_key)
            setattr(self, self.xcompull_attrname, objects)
            return super(ChildUnpackJinja, self).execute(context)

    return ChildUnpackJinja

The call of this factory can then be used as drop-in replacement for whatever Operator you originally wanted to use, like this:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

with Dag(...) as dag:
    gcpbucket_to_bigquery_stagingtable = operator_unpackjinja_factory(GCSToBigQueryOperator)(
        xcompull_taskid = "my_upstream_task_id",
        xcompull_attrname = "source_objects",
        task_id="gcpbucket_to_bigquery_stagingtable",
        dag=dag,
        bucket="my_bucket_name"
    )

As discussed in this question, I would recommend to also create the Memoize class and add the @Memoize-decorator to the factory:

class Memoize:
    def __init__(self, f):
        self.f = f
        self.memo = {}
    def __call__(self, *args):
        return self.memo.setdefault(args, self.f(*args))

@Memoize
def operator_unpackjinja_factory(baseclass):
   <content unchanged, see above>
Chris Stenkamp
  • 337
  • 1
  • 2
  • 15