70

I am looking for a solution to be able to log additional data when executing code on Apache Spark Nodes that could help investigate later some issues that might appear during execution. Trying to use a traditional solution like for example com.typesafe.scalalogging.LazyLogging fails because the log instance cannot be serialized on a distributed environment like Apache Spark.

I've investigated this problem and for now the solution that I found was to use the org.apache.spark.Logging trait like this :

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}

However it looks like the Logging trait is not a permanent solution for Apache Spark because it's marked as @DeveloperApi and the class documentation mentions:

This will likely be changed or removed in future releases.

I am wondering - are they any known logging solution that I can use and will allow me to log data when the RDDs are executed on Apache Spark nodes ?

@Later Edit : Some of the comments from below suggest to use Log4J. I've tried using Log4J but I'm still having issues when using logger from a Scala class (and not a Scala object). Here is my full code :

import org.apache.log4j.Logger
import org.apache.spark._

object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}

class LoggingTestWithRDD extends Serializable {

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }

}

The exception that I'm seeing is :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger

Bogdan N
  • 741
  • 1
  • 6
  • 9
  • 1
    Well... Configure your log4j and get your logging done. – sarveshseri Mar 23 '15 at 11:40
  • 1
    So basically Apache Spark forces you to use only log4j ? – Bogdan N Mar 23 '15 at 14:42
  • 2
    Fwiw, the correct spelling of log4j is "slf4j". – michael Mar 23 '15 at 17:11
  • In addition to / as an alternative to logging, metrics may give you what you want: http://spark.apache.org/docs/latest/monitoring.html – michael Mar 23 '15 at 17:19
  • 4
    @michael_n That's not correct. log4j and slf4j are different thing. – ben_frankly Aug 05 '15 at 23:21
  • @ben_frankly you misunderstood the joke. I'm well aware of slf4j/log4j, but many are/were (justifiably) confused about their roles (and perhaps still are). Log4j is an API *and* an implementation; slf4j is an API. When selecting a logging API, people *should* choose slf4j. This does not preclude using log4j as the implementation. (Anywhere "log4j" appears in code should be "spell checked" to be slf4j :-)) – michael Aug 07 '15 at 06:38
  • If you want something guaranteed to not change, and think its worth the effort, write it yourself. Possibly using Akka. But I don't think that is worthwhile - just change the code if Spark forces it. – BAR Oct 04 '15 at 20:30
  • There is no reason for using slf4j in an application, only in a library. BTW, I would recommend using Log4j 2.x. – Mikael Ståldal Jul 08 '16 at 13:31
  • If you use Log4j 2.x, the example should work since the Logger in Log4j 2.x is Serializable. – Mikael Ståldal Jul 08 '16 at 13:51
  • Can't you create the logger inside of `rdd.foreachPartition` section that allows to avoid logger serialization accross worker nodes? In this case each worker will have its own logger. – stanislav.chetvertkov Nov 24 '16 at 10:54
  • My solution prevents `TaskNotSerializable` – ragazzojp Jun 27 '22 at 20:14

7 Answers7

54

You can use Akhil's solution proposed in
https://www.mail-archive.com/user@spark.apache.org/msg29010.html. I have used by myself and it works.

Akhil Das Mon, 25 May 2015 08:20:40 -0700
Try this way:

object Holder extends Serializable {      
   @transient lazy val log = Logger.getLogger(getClass.getName)    
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}
BAR
  • 15,909
  • 27
  • 97
  • 185
florins
  • 1,605
  • 1
  • 17
  • 33
11

Use Log4j 2.x. The core logger has been made serializable. Problem solved.

Jira discussion: https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x"

"org.apache.logging.log4j" % "log4j-core" % "2.x.x"

"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Ryan Stack
  • 1,231
  • 1
  • 12
  • 25
  • 3
    Can you please give complete implementation of this logging. like how you create log4j2.properties and how implemented in code. – jAi Aug 21 '19 at 06:35
4

If you need some code to be executed before and after a map, filter or other RDD function, try to use mapPartition, where the underlying iterator is passed explicitely.

Example:

val log = ??? // this gets captured and produces serialization error
rdd.map { x =>
  log.info(x)
  x+1
}

Becomes:

rdd.mapPartition { it =>
  val log = ??? // this is freshly initialized in worker nodes
  it.map { x =>
    log.info(x)
    x + 1
  }
}

Every basic RDD function is implemented with a mapPartition.

Make sure to handle the partitioner explicitly and not to lose it: see Scaladoc, preservesPartitioning parameter, this is critical for performances.

ragazzojp
  • 477
  • 3
  • 14
2

This is an old post but I want to provide my working solution which I just got after struggling a lot and still can be useful for others:

I want to print rdd contents inside rdd.map function but getting Task Not Serializalable Error. This is my solution for this problem using scala static object which is extending java.io.Serializable:

import org.apache.log4j.Level

object MyClass extends Serializable{

val log = org.apache.log4j.LogManager.getLogger("name of my spark log")

log.setLevel(Level.INFO)

def main(args:Array[String])
{

rdd.map(t=>

//Using object's logger here

val log =MyClass.log

log.INFO("count"+rdd.count)
)
}

}
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
khushbu kanojia
  • 250
  • 1
  • 3
2

Making the logger transient and lazy does the trick

@transient lazy val log = Logger.getLogger(getClass.getName)

@transient will tell the spark to not serialize it for all executors and lazy will cause the instance to be created when it is first used. In other words each executor will have their own instance of the logger. Serializing the logger is not a good idea anyway even if you can.

Ofcourse anything you put in the map() closure will run on the executor so will be found in executor logs and not the driver logs. For custom log4j properties on the executors you need to add the log4j.properties to executor classpath and send your log4j.properties to the executors.

This can be done by adding the following args to your spark-submit command --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties " --files ./log4j.properties There are other ways to do set these configs but this one is the most common.

sparker
  • 1,245
  • 11
  • 17
1
val log = Logger.getLogger(getClass.getName),

You can use "log" to write logs . Also if you need change logger properties you need to have log4j.properties in /conf folder. By default we will have a template in that location.

bummi
  • 27,123
  • 14
  • 62
  • 101
Karthik
  • 1,801
  • 1
  • 13
  • 21
  • I've tried to use log4j but I am still having serialization issues when calling the logger variable from a class (not from a object) : `Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger` – Bogdan N Mar 24 '15 at 08:14
  • 3
    Simple solution: declare the log variable in local method scope. – nuaavee May 26 '15 at 17:50
  • 2
    What if you make "log" @transient ? – Mikael Ståldal Jul 08 '16 at 13:34
  • Map partition comes to rescue for such thing. You can create logger in mappartitions function and use it. This technique is used for JDBC connection/ mq / Kafka producer. – Ashkrit Sharma Feb 16 '19 at 15:45
0

Here is my solution :

I am using SLF4j (with Log4j binding), in my base class of every spark job I have something like this:

import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 

Just before the place where I use LOG in distributed functional code, I copy logger reference to a local constant.

val LOG = this.LOG

It worked for me!

Thamme Gowda
  • 11,249
  • 5
  • 50
  • 57