1

I'm using spark 2.1.0 and hadoop 2.7.3.

I was trying to use newAPIHadoopFile, very simple code in just one class with main method:

val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    val sparkContext = spark.sparkContext
    val sparkConf = sparkContext.getConf
    val file = "src/main/resources/chat.csv"
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkContext.getConf.registerKryoClasses(Array(
      Class.forName("org.apache.hadoop.io.LongWritable"),
      Class.forName("org.apache.hadoop.io.Text")
    ));
    sparkConf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text")
    val rdd = sparkContext.newAPIHadoopFile(file, classOf[KeyValueTextInputFormat], classOf[Text], classOf[Text])
    rdd.collect().foreach(println)

I checked many posts in StackOverflow, but still got error:

java.io.NotSerializableException: org.apache.hadoop.io.Text

Serialization stack:

    - object not serializable (class: org.apache.hadoop.io.Text, value:   How about Italian?"})

    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)

    - object (class scala.Tuple2, (  How about Italian?"},))

    - element of array (index: 0)

    - array (class [Lscala.Tuple2;, size 3)

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

Edit: content of chat.csv:

{from:"Gert", to:"Melissa", message:"Want to have dinner?"}
{from:"Melissa", to:"Gert", message:"Ok\
How about Italian?"}
Leonardo Herrera
  • 8,388
  • 5
  • 36
  • 66
Furyegg
  • 410
  • 1
  • 5
  • 18

1 Answers1

0

I'm running the same code you provided with spark 1.6.0 in my machine and it is working fine(I used tab separated file in HDFS). What version of spark/hadoop you are using?

In some cases(may be with the lower versions) Text do not implements serializable. So before collecting it map to String and try again,

something like below, the result of newAPIhadoopFile will be a tuple of key value,

rdd.map(x=>(x._1.toString,x._2.toString)).collect().foreach(println)

try this.

ref:click here

Community
  • 1
  • 1
Sathiyan S
  • 1,013
  • 6
  • 13