2

The following is interesting:

val rddSTG = sc.parallelize(
      List ( ("RTD","ANT","SOYA BEANS", "20161123", "20161123", 4000, "docid11", null, 5) , 
             ("RTD","ANT","SOYA BEANS", "20161124", "20161123", 6000, "docid11",  null, 4) ,
             ("RTD","ANT","BANANAS", "20161124", "20161123", 7000, "docid11", null, 9) ,    
             ("HAM","ANT","CORN", "20161123", "20161123", 1000, "docid22", null, 33),
             ("LIS","PAR","BARLEY", "20161123", "20161123", 11111, "docid33", null, 44)
           )
                          )

val dataframe = rddSTG.toDF("ORIG", "DEST", "PROD", "PLDEPDATE", "PLARRDATE", "PLCOST", "docid", "ACTARRDATE", "mutationseq")
dataframe.createOrReplaceTempView("STG")
spark.sql("SELECT * FROM STG ORDER BY PLDEPDATE DESC").show()

It generates an error as follows:

scala.MatchError: Null (of class scala.reflect.internal.Types$TypeRef$$anon$6)

As soon as I change one of the null values to non-null its works. I think I get it, in that no inference can be made on the field, but it does seem odd. Ideas?

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
thebluephantom
  • 16,458
  • 8
  • 40
  • 83

3 Answers3

5

The problem is - Any is too generic type in scala. In your case NULL is treated as ANY type.

Spark just has no idea how to serialize NULL.

We should explicitly provide some specific type.

Since null can't be assigned to primitive types in Scala you can use String to match the data type of the column's other values.

So try this:

case class Record(id: Int, name: String, score: Int, flag: String)
val sampleRdd = spark.sparkContext.parallelize(
  Seq(
    (1, null.asInstanceOf[String], 100, "YES"),
    (2, "RAKTOTPAL", 200, "NO"),
    (3, "BORDOLOI", 300, "YES"),
    (4, null.asInstanceOf[String], 400, "YES")))

sampleRdd.toDF("ID", "NAME", "SCORE","FLAG")

This way, the df will retain the null values.

Other way

with case class

case class Record(id: Int, name: String, score: Int, flag: String)

val sampleRdd = spark.sparkContext.parallelize(
  Seq(
    Record(1, null.asInstanceOf[String], 100, "YES"),
    Record(2, "RAKTOTPAL", 200, "NO"),
    Record(3, "BORDOLOI", 300, "YES"),
    Record(4, null.asInstanceOf[String], 400, "YES")))
sampleRdd.toDF()
Community
  • 1
  • 1
Raktotpal Bordoloi
  • 1,009
  • 8
  • 15
2

I'm not quite sure the reason behind the error but I am guessing that it is occurring because Null can't be a datatype of a dataframe column. Since your second last column is null which is part of the trait Null. Since they are at the bottom of the hierarchy, they can't be instantiated into any other type. But, null is a subtype of everything and therefore even if you change any of those null to ,say, a String, the column becomes String type. This is just an assumption.

However, for your case, defining a case class will work.

val rdd = sc.parallelize(List ( ("RTD","ANT","SOYA BEANS", "20161123", "20161123", 4000, "docid11", null, 5) , 
            ("RTD","ANT","SOYA BEANS", "20161124", "20161123", 6000, "docid11",  null, 4) ,
            ("RTD","ANT","BANANAS", "20161124", "20161123", 7000, "docid11", null, 9) ,    
            ("HAM","ANT","CORN", "20161123", "20161123", 1000, "docid22", null, 33),
            ("LIS","PAR","BARLEY", "20161123", "20161123", 11111, "docid33", null, 44)))
case class df_schema (ORIG: String, DEST: String, PROD: String, PLDEPDATE:String, PLARRDATE: String, PLCOSTDATE: Int, DOCID: String, ACTARRDATE: String, MUTATIONSEQ: Int)
val rddSTG = rdd.map( x=> df_schema(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9 ) )
val dataframe = sqlContext.createDataFrame(rddSTG)
philantrovert
  • 9,904
  • 3
  • 37
  • 61
1

Simple solution to yours would be to add a test line as

val rddSTG = sc.parallelize(
  Seq ( ("RTD","ANT","SOYA BEANS", "20161123", "20161123", 4000, "docid11", null, 5) ,
    ("RTD","ANT","SOYA BEANS", "20161124", "20161123", 6000, "docid11",  null, 4) ,
    ("RTD","ANT","BANANAS", "20161124", "20161123", 7000, "docid11", null, 9) ,
    ("HAM","ANT","CORN", "20161123", "20161123", 1000, "docid22", null, 33),
    ("LIS","PAR","BARLEY", "20161123", "20161123", 11111, "docid33", null, 44),
    ("test","test","test", "test", "test", 0, "test", "", 0)
    )
    )

And filter the test line after dataframe has been created as

val dataframe = rddSTG.toDF("ORIG", "DEST", "PROD", "PLDEPDATE", "PLARRDATE", "PLCOST", "docid", "ACTARRDATE", "mutationseq")
                      .filter(!(col("ORIG") === "test"))

And you can apply your rest of the logics as

dataframe.createOrReplaceTempView("STG")
spark.sql("SELECT * FROM STG ORDER BY PLDEPDATE DESC").show()

You should have output as

+----+----+----------+---------+---------+------+-------+----------+-----------+
|ORIG|DEST|      PROD|PLDEPDATE|PLARRDATE|PLCOST|  docid|ACTARRDATE|mutationseq|
+----+----+----------+---------+---------+------+-------+----------+-----------+
| RTD| ANT|SOYA BEANS| 20161124| 20161123|  6000|docid11|      null|          4|
| RTD| ANT|   BANANAS| 20161124| 20161123|  7000|docid11|      null|          9|
| RTD| ANT|SOYA BEANS| 20161123| 20161123|  4000|docid11|      null|          5|
| HAM| ANT|      CORN| 20161123| 20161123|  1000|docid22|      null|         33|
| LIS| PAR|    BARLEY| 20161123| 20161123| 11111|docid33|      null|         44|
+----+----+----------+---------+---------+------+-------+----------+-----------+

I hope this is helpful

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97