0

I have a code that streams HDFS text files. But each text files contains a header and description that is 50 lines. I want to disregard those lines and ingest only the data.

This is my code but it throws a SparkException: Task not serializable

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        ...
    }
  }
)
sophie
  • 991
  • 2
  • 15
  • 34
  • Possible duplicate of http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou – childofsoong Mar 13 '15 at 04:56

2 Answers2

0

Task not serializable error occurs in this case: Passing Functions to Spark: What is the risk of referencing the whole object? or Task not serializable exception while running apache spark job

Most likely you are creating some kind of object there and calling its function in an RDD method which forces the engine to serialize your object.

Unfortunately, the code part you have printed works perfectly well and the problem is in the part that is replaced with dots. For example, this one works:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._

val ssc = new StreamingContext(sc, Seconds(60))
val hdfsDStream = ssc.textFileStream("/sparkdemo/streaming")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        rowRDD.take(10).foreach(println)
    }
  }
)
ssc.start()
ssc.awaitTermination()
Community
  • 1
  • 1
0x0FFF
  • 4,948
  • 3
  • 20
  • 26
0

I think you just need to zipWithIndex and filter the cases where index is less than 50.

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDstream.foreachRDD( rdd => {
  val data = rdd.zipWithIndex.filter( _._2 < 50 ).map( _._1 )

  // Now do whatever you want with your data.
} )

Also... here - Row(p(0),p(1),p(2),p(3)), do you really need Row all of a sudden ?

sarveshseri
  • 13,738
  • 28
  • 47