1

I am using Spark 1.5.1.

Within the streaming context I get the SQLContext as follows

SQLContext sqlContext = SQLContext.getOrCreate(records.context()); DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class); dataFrame.registerTempTable("records");

records is a JavaRDD Each Record has the following structure

public class SchemaRecord implements Serializable {

private static final long serialVersionUID = 1L; 
private String msisdn; 
private String application_type; 
//private long uplink_bytes = 0L;
}

When the field types like msisdn and application_type are only strings everything works fine.

The moment I add another field like uplink_bytes which is of Long type I get the following NullPointer Exception at createDataFrame

Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.
catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1031)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:519)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:548)

Please suggest

Yukti Kaura
  • 55
  • 1
  • 11
  • How are you creating DataFrame? Dynamic or Manual? Can you please post the Complete code. Also are you defining the "SchemaRecord" as a Case Class, in case it is Dynamic? – Sumit Nov 27 '15 at 03:33
  • @Sumit - please find the edited question for the response to your questions – Yukti Kaura Nov 27 '15 at 04:20
  • @Sumit pardon me I do not well understand. Schema Rechord is a Plain Java Object. Not Dynamic. – Yukti Kaura Nov 27 '15 at 04:22
  • Try using Long (wrapper class) instead of "long". That should work. Also ensure that your RDD does contain the Long type values. – Sumit Nov 28 '15 at 00:31
  • @Sumit, earlier it was actually Long type instead of primitive, then I tried changing things to make them work , but neither of them does :( – Yukti Kaura Nov 29 '15 at 01:08
  • I would suggest to look at your data, there might be a possibility that the column might have Null values or something else apart from Long values. My suggestion would be to try out with small number of rows. Also please send me the data, I will also try it by myself. – Sumit Nov 29 '15 at 01:58
  • @YuktiKaura can you use "uplink" as variable Name instead of "uplink_bytes" and create proper getters and setters i resolved this issue by doing so – Sachin Janani Mar 23 '16 at 06:25

2 Answers2

5

Your problem is probably that you model class is not a clean JavaBean. At the moment Spark has no code to deal with properties that have a setter but no getter method. You can simply try something like this to inspect how Spark understands your class:

PropertyDescriptor[] props = Introspector.getBeanInfo(YourClass.class).getPropertyDescriptors();
for(PropertyDescriptor prop:props) {
    System.out.println(prop.getDisplayName());
    System.out.println("\t"+prop.getReadMethod());
    System.out.println("\t"+prop.getWriteMethod());
}

The introspector also recognizes fields that only have a setter as preoperties, which throws the NullPointerException in Spark.

manuel
  • 627
  • 7
  • 7
  • 1
    From the spark [documentation](https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#interoperating-with-rdds), Spark SQL supports automatically converting an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly) into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. – aloksahoo Feb 20 '17 at 17:44
1

Here is what I tried and it worked: -

Here is POJO which stores String, Long and Int Values: -

import java.io.*;

public class TestingSQLPerson implements Serializable {

// Here is Data in a comma Separated file: -
// Sumit,20,123455
// Ramit,40,12345

private String name;
private int age;
private Long testL;

public Long getTestL() {
    return testL;
}

public void setTestL(Long testL) {
    this.testL = testL;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public int getAge() {
    return age;
}

public void setAge(int age) {
    this.age = age;
}

}

And here is the Spark SQL Code in Java: -

import org.apache.spark.*;
import org.apache.spark.sql.*;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class TestingLongSQLTypes {

public static void main(String[] args) {

    SparkConf javaConf = new SparkConf();
    javaConf.setAppName("Test Long TTyypes");
    JavaSparkContext javaCtx = new JavaSparkContext(javaConf);


    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(javaCtx);

    String dataFile = "file:///home/ec2-user/softwares/crime-data/testfile.txt";
    JavaRDD<TestingSQLPerson> people = javaCtx.textFile(dataFile).map(
      new Function<String, TestingSQLPerson>() {
        public TestingSQLPerson call(String line) throws Exception {
          String[] parts = line.split(",");

          TestingSQLPerson person = new TestingSQLPerson();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
          person.setTestL(Long.parseLong(parts[2].trim()));

          return person;
        }
      });

    // Apply a schema to an RDD of JavaBeans and register it as a table.
    DataFrame schemaPeople = sqlContext.createDataFrame(people, TestingSQLPerson.class);
    schemaPeople.registerTempTable("TestingSQLPerson");

    schemaPeople.printSchema();
    schemaPeople.show();


}

}

All of the Above work and finally on the driver console i could see the results without any errors of Exception. @Yukti - In your case, It should work too, provided you follow the same steps as defined in the above examples. In case, there is any deviation, then let me know, I can try helping you.

Sumit
  • 1,400
  • 7
  • 9
  • 1
    As per Yukti question :"Within the streaming context I get the SQLContext as follows" i.e SQLContext is created in one of the transformation of DStream.I tried this scenario and faced the same issue @Sumit – Sachin Janani Mar 23 '16 at 05:51
  • Yeah I think that the SQLContext obtained from the streaming context of the key differences between @Sumit and Yukti codes. I also am doing streaming and need to obtain SQLContext in order to use createDataFrame. Running into this same issue as well. – Edi Bice May 12 '16 at 12:30