I have a code that streams HDFS text files. But each text files contains a header and description that is 50 lines. I want to disregard those lines and ingest only the data.
This is my code but it throws a SparkException: Task not serializable
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")
hdfsDStream.foreachRDD(
rdd => {
val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
if (partitionIdx == 0) {
lines.drop(50)
}
lines
})
val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))
if (data.count() > 0) {
...
}
}
)