I am using Spark 1.5.1.
Within the streaming context
I get the SQLContext
as follows
SQLContext sqlContext = SQLContext.getOrCreate(records.context());
DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class);
dataFrame.registerTempTable("records");
records is a JavaRDD Each Record has the following structure
public class SchemaRecord implements Serializable {
private static final long serialVersionUID = 1L;
private String msisdn;
private String application_type;
//private long uplink_bytes = 0L;
}
When the field types like msisdn and application_type are only strings everything works fine.
The moment I add another field like uplink_bytes which is of Long type I get the following NullPointer Exception at createDataFrame
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.
catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1031)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:519)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:548)
Please suggest