-1

Text

Basically, I want to achieve things like that.

I have no idea how to make sure the task_id can be called consistently. I used the PythonOperator and tried to call the kwargs.

One of my function is like:


def Dynamic_Structure(log_type, callableFunction, context, args):
    # dyn_value = "{{ task_instance.xcom_pull(task_ids='Extract_{}_test'.format(log_type)) }}"

    structure_logs = StrucFile(
            task_id = 'Struc_{}_test'.format(log_type),
            provide_context = True,
            format_location = config.DATASET_FORMAT,
            struc_location = config.DATASET_STR,
            # log_key_hash_dict_location = dl_config.EXEC_WINDOWS_MIDDLE_KEY_HASH_DICT,
            log_key_hash_dict_location = dl_config.DL_MODEL_DATASET/log_type/dl_config.EXEC_MIDDLE_KEY_HASH_DICT,
            data_type = 'test',
            xcom_push=True,
            dag = dag,
            op_kwargs=args,
            log_sort = log_type,
            python_callable = eval(callableFunction),
            # the following parameters depend on the format of raw log
            regex = config.STRUCTURE_REGEX[log_type],
            log_format = config.STRUCTURE_LOG_FORMAT[log_type],
            columns = config.STRUCTURE_COLUMNS[log_type]
            )

    return structure_logs 

Then I call it like:

for log_type in log_types:
...
    structure_logs_task = Dynamic_Structure(log_type, 'values_function', {'previous_task_id': 'Extract_{}_test'.format(log_type)})

But I can not get the performances like the chart.

I am confused about how to call pervious_task_id. Xcom_pull and Xcom_push would help, but I have no idea how to use it in PythonOperator.

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Newt
  • 787
  • 8
  • 15
  • check [Airflow Generate Dynamic Tasks in Single DAG](https://stackoverflow.com/q/52558018/3679900) – y2k-shubham Jun 10 '20 at 10:38
  • I think it is not my case. I need multiple matchings. This is only one worflow line. – Newt Jun 10 '20 at 11:18
  • **@Newt** it isn't much different (single ' workflow line' or multiple 'line's, the method of setting up is agnostic to it). A quick [google search](https://www.google.com/search?q=airflow+dynamic+task+creation) gives several leads. And then this [guide](https://www.astronomer.io/guides/dynamically-generating-dags/) from [astronomer.io](https://www.astronomer.io/guides) is super helpful. Hopefully it will resolve your problem. – y2k-shubham Jun 11 '20 at 07:50
  • @y2k-shubham. Thanks. That is really helpful. I will give it a go. – Newt Jun 11 '20 at 09:11

1 Answers1

0

The problem is not the code.

I tried to read environment from .env for the log_types. But I did not find the right way to read environment variable in docker images.

It should be:

In docker-compose.yml

version: "x"
services:

  xxx:
    build:
    # there is the space between context and .
      context: .
      dockerfile: ./Dockerfile
      args:
        - PORT=${PORT}
    volumes:
       ...

In Dockerfile

FROM xx
ARG PORT

ENV PORT "$PORT"

EXPOSE ${PORT}
...

In the root folder, you can define the ARG in .env.

Newt
  • 787
  • 8
  • 15