I'm try to implement to check exist record by received message from kafka in spark by spark streaming, now when i run RunReadLogByKafka object, there is a NotSerializableException for SparkContext was throwed, i google it, but i still don't know how to fix it, Could anyone suggest me how to rewrite it? thanks in advance.
package com.test.spark.hbase
import java.sql.{DriverManager, PreparedStatement, Connection}
import java.text.SimpleDateFormat
import com.powercn.spark.LogRow
import com.powercn.spark.SparkReadHBaseTest.{SensorStatsRow, SensorRow}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class LogRow(rowkey: String, content: String)
object LogRow {
def parseLogRow(result: Result): LogRow = {
val rowkey = Bytes.toString(result.getRow())
val p0 = rowkey
val p1 = Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content")))
LogRow(p0, p1)
}
}
class ReadLogByKafka(sct:SparkContext) extends Serializable {
implicit def func(records: String) {
@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "log")
@transient val sc = sct
@transient val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
try {
//query info table
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
println(hBaseRDD.count())
// transform (ImmutableBytesWritable, Result) tuples into an RDD of Results
val resultRDD = hBaseRDD.map(tuple => tuple._2)
println(resultRDD.count())
val logRDD = resultRDD.map(LogRow.parseLogRow)
val logDF = logRDD.toDF()
logDF.printSchema()
logDF.show()
// register the DataFrame as a temp table
logDF.registerTempTable("LogRow")
val logAdviseDF = sqlContext.sql("SELECT rowkey, content as content FROM LogRow ")
logAdviseDF.printSchema()
logAdviseDF.take(5).foreach(println)
} catch {
case e: Exception => e.printStackTrace()
} finally {
}
}
}
package com.test.spark.hbase
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object RunReadLogByKafka extends Serializable {
def main(args: Array[String]): Unit = {
val broker = "192.168.13.111:9092"
val topic = "log"
@transient val sparkConf = new SparkConf().setAppName("RunReadLogByKafka")
@transient val streamingContext = new StreamingContext(sparkConf, Seconds(2))
@transient val sc = streamingContext.sparkContext
val kafkaConf = Map("metadata.broker.list" -> broker,
"group.id" -> "group1",
"zookeeper.connection.timeout.ms" -> "3000",
"kafka.auto.offset.reset" -> "smallest")
// Define which topics to read from
val topics = Set(topic)
val messages = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](
streamingContext, kafkaConf, topics).map(_._2)
messages.foreachRDD(rdd => {
val readLogByKafka =new ReadLogByKafka(sc)
//parse every message, it will throw NotSerializableException
rdd.foreach(readLogByKafka.func)
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:888)
at com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1.apply(RunReadLogByKafka.scala:38)
at com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1.apply(RunReadLogByKafka.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@5207878f)
- field (class: com.test.spark.hbase.ReadLogByKafka, name: sct, type: class org.apache.spark.SparkContext)
- object (class com.test.spark.hbase.ReadLogByKafka, com.test.spark.hbase.ReadLogByKafka@60212100)
- field (class: com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1$$anonfun$apply$1, name: readLogByKafka$1, type: class com.test.spark.hbase.ReadLogByKafka)
- object (class com.test.spark.hbase.RunReadLogByKafka$$anonfun$main$1$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more