1

In airflow it's possible to write to the log with a simple print() or with logger as suggested here

However this won't work when trying to print inside an operator.

I have the following code:

for i in range(5, 0, -1):
    gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        params={'i': i},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )

Now say I want to print "My name is load_ads_to_BigQuery-{}".format{i) as you can see this print is uniuqe per operator.

If I do it as:

for i in range(5, 0, -1):
    print("My name is load_ads_to_BigQuery-{}".format{i))
    gcs_export_uri_template = ...
    update_bigquery = GoogleCloudStorageToBigQueryOperator(...)

All 5 operators will print all 5 prints. Which is incorrect in my case. The print must be inside the GoogleCloudStorageToBigQueryOperator.

How can I do that?

kaxil
  • 17,706
  • 2
  • 59
  • 78
Programmer120
  • 2,362
  • 9
  • 30
  • 48
  • You can also log _inside_ operators using the log method from BaseOperator like so `self.log.info("This was %s", "fun")`. Are all the print-statements done inside the dag or is your for-loop inside a custom Operator? – judoole Sep 03 '18 at 06:48
  • @judoole I'm not sure I understand – Programmer120 Sep 03 '18 at 07:27
  • Do you print inside your dag-file, or are you doing your printing inside a selfcreated module, creating 5 `GoogleCloudStorageToBigQueryOperator`s in a for loop? Sorry for the late reply. – judoole Sep 03 '18 at 10:43
  • @judoole I'm not printing at all.. I want to add the print to my code. I want to print inside the GoogleCloudStorageToBigQueryOperator. – Programmer120 Sep 03 '18 at 11:38
  • If understand correctly you would like to print something like "Uploading file bla-bla-bla to bucket blablabla" and when done "Uploaded file bla-bla-bla"? – judoole Sep 03 '18 at 12:54
  • @judoole yep. that is correct. but each `operator(i)` should show only the prints for his `i` – Programmer120 Sep 03 '18 at 12:55
  • Aha. Sounds like something I've just done this week :) I'll try to answer with the solution I ended up with – judoole Sep 03 '18 at 12:57
  • Ups. Sorry, read that the wrong way. I have not done that before. Might be that you can do a `gcs_export_uri_template = ["{% print 'my-name-is-" + i + "%} adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]` – judoole Sep 03 '18 at 13:21
  • @judoole I'm pretty sure there is a syntax error with it...you start with `"` then close it before the 2nd `%`? – Programmer120 Sep 03 '18 at 13:36
  • Yes, it should rather be a `'` I see. Somewhat ugly code to use string concatination, but it skips the problem with `{` being stripped with String.format. What I thought might work is to use the `%` to execute code-blocks, like https://airflow.apache.org/tutorial.html#templating-with-jinja – judoole Sep 04 '18 at 06:48
  • it doesn't seem to work... – Programmer120 Sep 04 '18 at 06:55
  • Bah. Thought so. The closest thing I can think of then is to either write an own Operator-class that instantiates a `GoogleCloudStorageToBigQueryOperator` in its `execute`-method, and does logging there. Or copy the code of `GoogleCloudStorageToBigQueryOperator` and edit to your own liking. Maybe a PR to Airflow would be appreciated also :) – judoole Sep 04 '18 at 07:03
  • 1
    I think this is a problem for all operators. You are suggesting that I will wrap my operator with a function that will print and then will execute the operator.. I don't think airflow support this :\ I submitted a ticket https://issues.apache.org/jira/browse/AIRFLOW-3000 – Programmer120 Sep 04 '18 at 07:06
  • No, wrap the execution and creation of `GoogleCloudStorageToBigQueryOperator` in an own class. I can illustrate in a solution. – judoole Sep 04 '18 at 07:10
  • @judoole I need this print in various of operators on like 20 dags :\ – Programmer120 Sep 04 '18 at 07:16

2 Answers2

1

A solution that might suffice is to create a wrapper-class. Example:

class MyGoogleCloudStorageToBigQueryOperator(BaseOperator):
    template_fields = ('bucket', 'source_objects',
                   'schema_object', 'destination_project_dataset_table')

    @apply_defaults
    def __init__(self,
          bucket,
          destination_project_dataset_table,
          source_format,
          source_objects,
          schema_fields,
          params,
          create_disposition,
          write_disposition,
          skip_leading_rows,
          google_cloud_storage_conn_id,
          bigquery_conn_id,
          *args,
          **kwargs):
     super(InfogroupFilestreamOperator, self).__init__(*args, **kwargs)
     self.bucket= bucket
     self.destination_project_dataset_table=destination_project_dataset_table
     ...

  def execute(self, context):
      self.log.info("My name is %s", self.task_id)
      hook=GoogleCloudStorageToBigQueryOperator(
            task_id="doesnt_matter",
            bucket=self.bucket,             
            source_format=self.source_format,
            ...
            )

     return hook.execute(context)

MyGoogleCloudStorageToBigQueryOperator can then be instantiated instead of GoogleCloudStorageToBigQueryOperator in your for-loop.

judoole
  • 1,382
  • 2
  • 10
  • 20
  • Why not `class MyGoogleCloudStorageToBigQueryOperator(GoogleCloudStorageToBigQueryOperator): pass`, override `execute`, and call `super().execute(context)`? – Micah Smith Sep 06 '18 at 16:04
  • 1
    Yes, that I guess would work just as well in this case. Much better solution. Feel free to edit if you'd like some extra points, or make an answer yourself. I'll upvote. – judoole Sep 06 '18 at 18:13
1

Building off of @judoole's answer, you could extend GoogleCloudStorageToBigQueryOperator directly.

class MyGoogleCloudStorageToBigQueryOperator(LoggingMixin, GoogleCloudStorageToBigQueryOperator):

    def execute(self, context):
        self.logger.info('Inside task {task_id}'.format(task_id=context['task_id']))
        super().execute(context)

More generally, you could write a mixin class that would automatically do this type of logging for a variety of operators.

class LogTaskExecutionMixin(object):

    def execute(self, context):
        self.logger.info('Inside task {task_id}'.format(task_id=context['task_id']))
        super().execute(context)

class MyGoogleCloudStorageToBigQueryOperator(
    LogTaskExecutionMixin, LoggingMixin, GoogleCloudStorageToBigQueryOperator
):
    pass

The thinking in both of these approaches is that you define a new operator that writes a log message before execution but is otherwise identical to the operator that you have extended.

Micah Smith
  • 4,203
  • 22
  • 28