8

I'm trying to use custom log4j appender inside spark executor, in order to forward all logs to Apache Kafka.

The problem is, log4j is initialized before fatjar's classloader with appender gets registered, so I end up with following:

log4j:ERROR Could not instantiate class [kafka.producer.KafkaLog4jAppender].
java.lang.ClassNotFoundException: kafka.producer.KafkaLog4jAppender
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:260)
    at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
    at org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
    at org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
    at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
    at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
    at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
    at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
    at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
    at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
    at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
    at org.apache.spark.Logging$class.log(Logging.scala:51)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.log(CoarseGrainedExecutorBackend.scala:126)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:137)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:235)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
log4j:ERROR Could not instantiate appender named "KAFKA".
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Starting remoting
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@gin3.dev:36918]
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting now listens on addresses: [akka.tcp://driverPropsFetcher@gin3.dev:36918]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Starting remoting
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@gin3.dev:40067]
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@gin3.dev:40067]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-5] INFO Remoting: Remoting shut down
....

The problem seems to be right here: https://github.com/apache/spark/blob/v1.3.1/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L126

Is there any easy way to solve this? We are currently using Spark 1.3.x.

Thanks

David

David Moravek
  • 191
  • 2
  • 10

3 Answers3

5

Ended up submitting extra jar with logging deps and loading it before user classpath.

LOG_JAR="${THISDIR}/../lib/logging.jar"
spark-submit ...... \
  --files "${LOG4J_CONF},${LOG_JAR}" \
  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
  --conf "spark.driver.extraClassPath=`basename ${LOG_JAR}`" \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
  --conf "spark.executor.extraClassPath=`basename ${LOG_JAR}`" \
  ...

https://issues.apache.org/jira/browse/SPARK-10881?filter=-2

David Moravek
  • 191
  • 2
  • 10
  • What path could placed at `basename`? case of you? When I submit job as client mode, base name can be a local path of submitting host. How about executors? A little confused. – Jihun No Aug 27 '19 at 01:27
1

Was facing the same issue , I will post what worked for me, it turns out the KafkaLog4jAppenderclass package name changed in kafka 0.9, here is what I did, added following dependency in pom

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <version>0.9.0.0</version>
    </dependency>

and changed my log4j.properties from

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender

to

log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender 
Benak Raj
  • 318
  • 3
  • 15
0

kafka.producer.KafkaLog4jAppender is in kafka's hadoop-producer.

so you can add this dependency to fix it.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>hadoop-producer</artifactId>
    <version>0.8.0</version>
</dependency>
xiezefan
  • 601
  • 6
  • 5