0

The following code is trying to parse files but, it would constantly raise exceptions when I try to access elements in the RDD.

val raw_data = sc.textFile(path).map(_.split(",")).mapPartitions(_.drop(1)).filter(_.size > 4).map(s=>s)  //remove header and empty entries
raw_data.count
val raw_by_user: RDD[(String, Iterable[Array[String]])] = raw_data.map{s =>
  if(s.size > 3)
  (s(0), Array[String](s(0),toStandarddate(s(2)),toEntryExit(s(3)),s(5),s(6) ,jr_type,"TST_0", stationMap(s(5)),stationMap(s(6))))
  else{
    println(s(0) , s.mkString(","))
    (s(0) , Array[String]())
  }
}.groupByKey()

raw_by_user.count

Error :

16/01/05 13:39:30 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) java.util.NoSuchElementException: key not found: 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at DataCreation.ProcessData$$anonfun$9.apply(package.scala:77) at DataCreation.ProcessData$$anonfun$9.apply(package.scala:75) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

Any Ideas what could be possible problems ? and how to handle exceptions ?

Hammad Haleem
  • 1,374
  • 1
  • 16
  • 26

2 Answers2

0

.mapPartitions(_.drop(1)) is incorrect. If you try running this on the spark-shell you get an empty RDD:

sc.parallelize(List("header", "cat,dog", "a1,b1,c1,d1,e1,f1"))
  .map(_.split(",")).mapPartitions(_.drop(1)).collect()

You need to find another way to drop your header row depending on your data. Perhaps you can filter it, or you could use the approach described here.

Community
  • 1
  • 1
jbrown
  • 7,518
  • 16
  • 69
  • 117
0

I agree with @jbrown said, you need to remove the header in another way.

As described here, the most efficient way to escape the first line is

raw_data.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

Or if you like

raw_data = sc.textFile('path_to_data')
header = raw_data.first() #extract header
data = raw_data.filter(lambda x:x !=header)    #filter out header
Community
  • 1
  • 1
Rami
  • 8,044
  • 18
  • 66
  • 108