7

I have a requirement to log the Apache Airflow logs to stdout in JSON format. Airflow does not seem to project this capability out of the box. I have found a couple python modules that are capable of this task, but I cannot get the implementation to work.

Currently, I am applying a class in airflow/utils/logging.py to modify the logger, shown below:

from pythonjsonlogger import jsonlogger

class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
    jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)

def process_log_record(self, log_record):
    if log_record.get('level'):
        log_record['severity'] = log_record['level']
        del log_record['level']
    else: 
        log_record['severity'] = log_record['levelname']
        del log_record['levelname']
    if log_record.get('asctime'):
        log_record['timestamp'] = log_record['asctime']
        del log_record['asctime']
    now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    log_record['nanotime'] = now
    return super(StackdriverJsonFormatter, self).process_log_record(log_record)

I am implementing this code in /airflow/settings.py as shown below:

from airflow.utils import logging as logconf

def configure_logging(log_format=LOG_FORMAT):
     handler = logconf.logging.StreamHandler(sys.stdout)
     formatter = logconf.StackdriverJsonFormatter()
     handler.setFormatter(formatter)
     logging = logconf.logging.getLogger()
     logging.addHandler(handler)
''' code below was original airflow source code
     logging.root.handlers = []
     logging.basicConfig(
         format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''

I have tried a couple different variations of this and can't get the python-json-logger to transform the logs to JSON. Perhaps I'm not getting to the root logger? Another option I have considered is manually formatting the logs to a JSON string. No luck with that yet either. Any alternative ideas, tips, or support are appreciated.

Cheers!

  • It is not clear if you are required to make an Airflow process itself to output logs to stdout or if it can be any other process that outputs Airflow's logs to stdout. – SergiyKolesnikov Oct 27 '18 at 11:18
  • @SergiyKolesnikov - Preferable solution is to handle this without the additional layer of logging. Would like to publish this stream to stdout and have it read directly by fluentd. Encountering issues with implementing any custom handlers (and thus formatters) in Airflow. Whenever I do (as in above snippet), it seems to dismantle the entire logging setup. Any tips? The only way I can get the `configure_logging()` method to work is with the `logging.basicConfig(...` method, which requires a string to be passed in as the format rather than an additional method. – Matthew Bennett Oct 31 '18 at 00:11

1 Answers1

8

I don't know if you ever solved this problem, but after some frustrating tinkering, I ended up getting this to play nice with airflow. For reference, I followed a lot of this article to get it working: https://www.astronomer.io/guides/logging/. The main issue was that the airflow logging only accepts a string template for the logging format, which json-logging can't plug into. So you have to create your own logging classes and connect it to a custom logging config class.

  1. Copy the log template here into your $AIRFLOW_HOME/config folder, and change DEFAULT_CONFIG_LOGGING to CONFIG_LOGGING. When you're successful, bring up airflow and you'll get a log message on airflow startup that says Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG. If this is the first .py file in the config folder don't forget to add a blank __init__.py file to get python to pick it up

  2. Write your custom JsonFormatter to inject into your handler. I did mine off of this one.

  3. Write the custom log handler classes. Since I was looking for JSON logging, mine look like this:

from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger

class JsonStreamHandler(RedirectStdHandler):
    def __init__(self, stream):
        super(JsonStreamHandler, self).__init__(stream)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileTaskHandler(FileTaskHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileProcessorHandler(FileProcessorHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonRotatingFileHandler(RotatingFileHandler):
    def __init__(self, filename, mode, maxBytes, backupCount):
        super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)
  1. Hook them up to the logging configs in your custom logging_config.py file.
'handlers': {
    'console': {
        'class': 'logging_handler.JsonStreamHandler',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'logging_handler.JsonFileTaskHandler',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'logging_handler.JsonFileProcessorHandler',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    }
}
...

and

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
    'handlers': {
        'processor_manager': {
            'class': 'logging_handler.JsonRotatingFileHandler',
            'formatter': 'airflow',
            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            'mode': 'a',
            'maxBytes': 104857600,  # 100MB
            'backupCount': 5
        }
    }
...

And json logs should be output, both in the DAG logs and the output as well.

Hope this helps!

Marco
  • 8,958
  • 1
  • 36
  • 56
Ivan Peng
  • 579
  • 5
  • 16