28

What is the correct way to access the log4j logger of Spark using pyspark on an executor?

It's easy to do so in the driver but I cannot seem to understand how to access the logging functionalities on the executor so that I can log locally and let YARN collect the local logs.

Is there any way to access the local logger?

The standard logging procedure is not enough because I cannot access the spark context from the executor.

Chobeat
  • 3,445
  • 6
  • 41
  • 59

3 Answers3

32

You cannot use local log4j logger on executors. Python workers spawned by executors jvms has no "callback" connection to the java, they just receive commands. But there is a way to log from executors using standard python logging and capture them by YARN.

On your HDFS place python module file that configures logging once per python worker and proxies logging functions (name it logger.py):

import os
import logging
import sys

class YarnLogger:
    @staticmethod
    def setup_logger():
        if not 'LOG_DIRS' in os.environ:
            sys.stderr.write('Missing LOG_DIRS environment variable, pyspark logging disabled')
            return 

        file = os.environ['LOG_DIRS'].split(',')[0] + '/pyspark.log'
        logging.basicConfig(filename=file, level=logging.INFO, 
                format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')

    def __getattr__(self, key):
        return getattr(logging, key)

YarnLogger.setup_logger()

Then import this module inside your application:

spark.sparkContext.addPyFile('hdfs:///path/to/logger.py')
import logger
logger = logger.YarnLogger()

And you can use in inside your pyspark functions like normal logging library:

def map_sth(s):
    logger.info("Mapping " + str(s))
    return s

spark.range(10).rdd.map(map_sth).count()

The pyspark.log will be visible on resource manager and will be collected on application finish, so you can access these logs later with yarn logs -applicationId ..... enter image description here

Mariusz
  • 13,481
  • 3
  • 60
  • 64
  • Thank you very much. In the meanwhile I had found more answers validating what you said. Our current approach to logging is somewhat similar to what you did. This is a good confirmation we are on a good track, even if this solution lacks all the flexibility of log4j. – Chobeat Nov 28 '16 at 09:33
  • @feroze what version of spark and YARN are you using? – Mariusz Jan 03 '17 at 07:10
  • OK, in case it was not clear to the moderators, the reason I put another answer was because the comment section does not allow formatting. – feroze Jan 03 '17 at 18:00
  • @mariusz it is hadoop 2.6.0 part of cloudera distribution CDH5.8.3 Hadoop 2.6.0-cdh5.8.3 Subversion http://github.com/cloudera/hadoop -r 992be3bac6b145248d32c45b16f8fce5a984b158 Compiled by jenkins on 2016-10-13T03:20Z Compiled with protoc 2.5.0 From source with checksum ef7968b8b98491d54f83cb3bd7a87ea This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.8.3.jar – feroze Jan 03 '17 at 18:04
  • This did not work for me as well. The log is getting piped to /tmp/logs in the worker nodes. – kuriouscoder Feb 09 '17 at 07:41
  • @kuriouscoder @feroze can you please check if output of something like: `spark.range(20).rdd.map(lambda x: os.environ['LOG_DIRS']).collect()` looks like valid paths on YARN nodes? – Mariusz Feb 10 '17 at 16:48
  • In my case, the user running the executor did not have access for the YARN log folder (it was provided to hadoop user.) Do these YARN logs go into a default HDFS loc? – kuriouscoder Feb 10 '17 at 17:07
  • `LOG_DIRS` should be a directory that holds logs for running container and it should be writeable by containters owner (it's where stdout and stderr files go). It is local path on yarn nodemanager server. When job ends these files are copied to HDFS. – Mariusz Feb 11 '17 at 08:42
  • @Mariusz So 'LOG_DIRS' is a hdfs path and yarn will copy log to hdfs automatically, right? – Zhang Tong Mar 02 '17 at 08:54
  • 1
    @zhangtong Nope, `LOG_DIRS` is local path on nodemanagers, it is where your processes can write data. When job ends these files are copied to HDFS and kept for `yarn.log-aggregation.retain-seconds` seconds. See `yarn.log-aggregation*` options in `yarn-site.xml` for more info. – Mariusz Mar 03 '17 at 06:40
  • @Mariusz Thank you ! But I can not access app logs from yarn web ui(localhost:8088) if I set yarn.log-aggregation-enable = true in yarn-site.xml. – Zhang Tong Mar 09 '17 at 09:26
  • @Mariusz By the way , log works fine through yarn logs -applicationId xxx – Zhang Tong Mar 09 '17 at 10:12
  • @zhangtong it's how it works. Until application is running you can access logs from web ui. When application finishes, logs are aggregated on hdfs (and no longer exist on nodemanagers), so the only way to access them is via `yarn logs` – Mariusz Mar 11 '17 at 19:19
  • 2
    In case anyone's not sure you can access `pyspark.log` alone by doing `yarn logs -applicationId -log_files pyspark.log` – snark Dec 01 '17 at 09:56
  • does the file path in `addPyFile` have to be a hdfs path? Can I use a local one? I'm seeing the following error `pickle.PicklingError: Cannot pickle files that are not opened for reading` – Rohan A Feb 21 '18 at 19:50
  • @RohanA The path passed to `addPyFile` needs to be HDFS path. If you want to use local file, modify `spark-submit` options adding `--py-files your_file.py` and you will not need call `addPyFile` from the code. The `PicklingError` may be an indicator that executors can't find logger code. – Mariusz Feb 25 '18 at 21:52
  • @Mariusz Great answer for people working on yarn clusters! What if I am working on standalone spark cluster? Do you have any resources/links to what I should be doing to log form executors? – void Mar 29 '18 at 08:37
  • 1
    If you want to write it directly into the "console", you can replace the file by `stream=sys.stdout` – kulssaka Apr 04 '18 at 15:24
  • When I try to write log from mapPartition(get_content_s3), i'm getting this WARN TaskSetManager: Lost task 362.0 in stage 0.0 ...: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt/yarn/usercache/xxx/appcache/application_1546994237949_0576/container_1546994237949_0576_01_000056/pyspark.zip/pyspark/worker.py", line 230, in main process() File "./script.py", line 174, in get_content_s3 NameError: global name 'logger' is not defined. I'm using it as logger.error("get_content_s3 reporting error "). Am I doing it wrong? – lugger1 Jan 16 '19 at 20:10
  • @lugger1 Make sure you imported the module with `spark.sparkContext.addPyFile('hdfs:///path/to/logger.py')` – Mariusz Jan 17 '19 at 21:55
  • This works fine for me when i log from inside a map function. However, when i log from the driver, i don't see the logs. Any idea what I am missing here? Btw, I am using a custom path to the log file and the user running the job has persmissions to write which is evident from the fact that the executors are able to log. – Rishi S Jan 29 '20 at 12:38
  • @RishiS This is expected, as the method works for logging from executors, not the driver. On driver you can just use standard `import logging` way (see https://docs.python.org/3.8/library/logging.html) – Mariusz Jan 30 '20 at 08:31
  • Is this still the correct approach? I have been able to access the executor logs through the spark history server on some systems, but I would like to know how to set this up. Are there settings within the spark history server or spark job to enable this? – Chogg Jun 08 '21 at 17:33
9

Note that Mariusz's answer returns a proxy to the logging module. This works (upvoted) when your logging demands are very basic. Once you're interested in doing things like configuring multiple logger instances or using multiple handlers, it will be lacking. E.g. if you have a larger set of code that you only want to run when debugging, one of the solutions would be to check a logger instance's isEnabledFor method, like so:

logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
    # do some heavy calculations and call `logger.debug` (or any other logging method, really)

This would fail when the method is called on the logging module, like in Mariusz's answer, because the logging module does not have such an attribute.

One way to solve this would be to create a spark_logging.py module in which you configure the logging and return a new instance of Logger. The code below shows an example of this, which configures logging using dictConfig. It also adds a filter so that the number of repetitions from all the worker nodes is greatly reduced when using the root logger (filter example is from Christopher Dunn (ref)).

# spark_logging.py
import logging
import logging.config
import os
import tempfile
from logging import *  # gives access to logging.DEBUG etc by aliasing this module for the standard logging module


class Unique(logging.Filter):
    """Messages are allowed through just once.
    The 'message' includes substitutions, but is not formatted by the
    handler. If it were, then practically all messages would be unique!
    """
    def __init__(self, name=""):
        logging.Filter.__init__(self, name)
        self.reset()

    def reset(self):
        """Act as if nothing has happened."""
        self.__logged = {}

    def filter(self, rec):
        """logging.Filter.filter performs an extra filter on the name."""
        return logging.Filter.filter(self, rec) and self.__is_first_time(rec)

    def __is_first_time(self, rec):
        """Emit a message only once."""
        msg = rec.msg %(rec.args)
        if msg in self.__logged:
            self.__logged[msg] += 1
            return False
        else:
            self.__logged[msg] = 1
            return True


def getLogger(name, logfile="pyspark.log"):
    """Replaces getLogger from logging to ensure each worker configures
    logging locally."""

    try:
        logfile = os.path.join(os.environ['LOG_DIRS'].split(',')[0], logfile)
    except (KeyError, IndexError):
        tmpdir = tempfile.gettempdir()
        logfile = os.path.join(tmpdir, logfile)
        rootlogger = logging.getLogger("")
        rootlogger.addFilter(Unique())
        rootlogger.warning(
            "LOG_DIRS not in environment variables or is empty. Will log to {}."
            .format(logfile))

    # Alternatively, load log settings from YAML or use JSON.
    log_settings = {
        'version': 1,
        'disable_existing_loggers': False,
        'handlers': {
            'file': {
                'class': 'logging.FileHandler',
                'level': 'DEBUG',
                'formatter': 'detailed',
                'filename': logfile
            },
            'default': {
                'level': 'INFO',
                'class': 'logging.StreamHandler',
            },
        },
        'formatters': {
            'detailed': {
                'format': ("%(asctime)s.%(msecs)03d %(levelname)s %(module)s - "
                           "%(funcName)s: %(message)s"),
            },
        },
        'loggers': {
            'driver': {
                'level': 'INFO',
                'handlers': ['file', ]
            },
            'executor': {
                'level': 'DEBUG',
                'handlers': ['file', ]
            },
        }
    }

    logging.config.dictConfig(log_settings)
    return logging.getLogger(name)

You could then import this module and alias it for logging itself:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test logging") \
    .getOrCreate()

try:
    spark.sparkContext.addPyFile('s3://YOUR_BUCKET/spark_logging.py')
except:
    # Probably running this locally. Make sure to have spark_logging in the PYTHONPATH
    pass
finally:
    import spark_logging as logging

def map_sth(s):
    log3 = logging.getLogger("executor")
    log3.info("Logging from executor")

    if log3.isEnabledFor(logging.DEBUG):
        log3.debug("This statement is only logged when DEBUG is configured.")

    return s

def main():
    log2 = logging.getLogger("driver")
    log2.info("Logging from within module function on driver")
    spark.range(100).rdd.map(map_sth).count()

if __name__ == "__main__":
    log1 = logging.getLogger("driver")
    log1.info("logging from module level")
    main()

Like with Mariusz's answer, logs will be accessible using the resource manager (or dumped in your temp-folder when LOG_DIRS is not in your environment variables). The error handling done at the top of this script is added so that you could run this script locally.

This approach allows more freedom: you could have the executors log to one file and all kinds of aggregation counts on the drive in another file.

Note that there is slightly more work to be done in this case, compared to using a class as a proxy for the built-in logging module, as each time you request a logger on the executor instances, it will have to be configured. That likely won't be your main time-hog when doing big data analytics though. ;-)

Community
  • 1
  • 1
Oliver W.
  • 13,169
  • 3
  • 37
  • 50
  • Hi @Oliver W., Many thanks for this. In case of using spark on EMR of AWS, what LOG_DIRS variable should contain to see the pyspark.log file through the resource manager? – Roxana Mar 12 '19 at 08:52
4

I have yet another approach to solve logging issue in PySpark. Idea is as follows:

  • Use remote log management service (For example Loggly, CloudWatch on AWS, Application Insights on Azure etc)
  • Configure logging module in both master node and worker nodes using same configuration to send logs to above sevices

This is good approach if you are already using cloud services as many of them also have log collection/management services.

I have a simple wordcount example on Github to demonstrate this approach https://github.com/chhantyal/wordcount

This Spark app sends logs to Loggly using standard logging module from driver (master node) as well as executors (worker nodes).

chhantyal
  • 11,874
  • 7
  • 51
  • 77