1

I'm try to implement to check exist record by received message from kafka in spark by spark streaming, now when i run RunReadLogByKafka object, there is a NotSerializableException for SparkContext was throwed, i google it, but i still don't know how to fix it, Could anyone suggest me how to rewrite it? thanks in advance.

package com.test.spark.hbase

import java.sql.{DriverManager, PreparedStatement, Connection}
import java.text.SimpleDateFormat
import com.powercn.spark.LogRow
import com.powercn.spark.SparkReadHBaseTest.{SensorStatsRow, SensorRow}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

case class LogRow(rowkey: String, content: String)

object LogRow {
  def parseLogRow(result: Result): LogRow = {
    val rowkey = Bytes.toString(result.getRow())
    val p0 = rowkey
    val p1 = Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content")))
    LogRow(p0, p1)
  }
}

 class ReadLogByKafka(sct:SparkContext) extends Serializable {

   implicit def func(records: String) {
    @transient val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "log")
    @transient val sc = sct
    @transient val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._
    try {
      //query info table
      val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
      println(hBaseRDD.count())
      // transform (ImmutableBytesWritable, Result) tuples into an RDD of Results
      val resultRDD = hBaseRDD.map(tuple => tuple._2)
      println(resultRDD.count())
      val logRDD = resultRDD.map(LogRow.parseLogRow)
      val logDF = logRDD.toDF()
      logDF.printSchema()
      logDF.show()
      // register the DataFrame as a temp table
      logDF.registerTempTable("LogRow")
      val logAdviseDF = sqlContext.sql("SELECT rowkey, content as content FROM LogRow ")
      logAdviseDF.printSchema()
      logAdviseDF.take(5).foreach(println)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {

    }
  }

}

package com.test.spark.hbase

import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


object RunReadLogByKafka extends Serializable {

  def main(args: Array[String]): Unit = {
    val broker = "192.168.13.111:9092"
    val topic = "log"

    @transient val sparkConf = new SparkConf().setAppName("RunReadLogByKafka")
    @transient val streamingContext = new StreamingContext(sparkConf, Seconds(2))
    @transient val sc = streamingContext.sparkContext

    val kafkaConf = Map("metadata.broker.list" -> broker,
      "group.id" -> "group1",
      "zookeeper.connection.timeout.ms" -> "3000",
      "kafka.auto.offset.reset" -> "smallest")

    // Define which topics to read from
    val topics = Set(topic)

    val messages = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](
      streamingContext, kafkaConf, topics).map(_._2)

    messages.foreachRDD(rdd => {
      val readLogByKafka =new ReadLogByKafka(sc)
      //parse every message, it will throw NotSerializableException
      rdd.foreach(readLogByKafka.func)
    })

    streamingContext.start()
    streamingContext.awaitTermination()
  }


}

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:888)
    at com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1.apply(RunReadLogByKafka.scala:38)
    at com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1.apply(RunReadLogByKafka.scala:35)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@5207878f)
    - field (class: com.test.spark.hbase.ReadLogByKafka, name: sct, type: class org.apache.spark.SparkContext)
    - object (class com.test.spark.hbase.ReadLogByKafka, com.test.spark.hbase.ReadLogByKafka@60212100)
    - field (class: com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1$$anonfun$apply$1, name: readLogByKafka$1, type: class com.test.spark.hbase.ReadLogByKafka)
    - object (class com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
zero323
  • 322,348
  • 103
  • 959
  • 935
Mike
  • 147
  • 1
  • 3
  • 11
  • Possible duplicate of [Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects](http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou) – Harshad Feb 25 '16 at 04:53
  • I have read this link, but i don't know how to rewirte my code, i try to add @transient for sparkContext, but it still throw the NotSerializableException . – Mike Feb 25 '16 at 04:56
  • try enabling `spark.driver.allowMultipleContexts` and then create the `SparkContext` in `foreach` or in `ReadLogByKafka`. **NOTE** This is not recommend approach but in some extreme you may need it. – Sumit Feb 25 '16 at 07:28
  • Thanks for your help, yes, if elable `allowMultipleContexts` ,the SparkContext can be called in foreach in ReadLogByKafka, i don't know if it can be implemented use only one context. – Mike Feb 25 '16 at 08:23

1 Answers1

0

Object you get as an argument in foreachRDD is a standard Spark RDD so you have to obey exactly the same rules as usual including no nested actions or transformations and no access to the SparkContext. It is not exactly clear what you try to achieve (It doesn't look like ReadLogByKafka.func is doing anything useful) but I am guess you're looking for some kind of join.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for your explanation, in fact, i want to execute query hbase by received message via spark hive in here,so i want to get the 'sparkContext' in 'foreach' in 'ReadLogByKafka.func' – Mike Feb 25 '16 at 08:31
  • You can try directly but not using Spark tools. – zero323 Feb 25 '16 at 08:52