2

Im using spark and i've been struggling to make a simple unit test pass with a Dataframe and Spark SQL.

Here is the snippet code :

class TestDFSpec extends SharedSparkContext  { 
  "Test DF " should { 
    "pass equality" in { 
      val createDF = sqlCtx.createDataFrame(createsRDD,classOf[Test]).toDF() 
      createDF.registerTempTable("test") 

      sqlCtx.sql("select * FROM test").collectAsList() === List(Row(Test.from(create1)),Row(Test.from(create2))) 
    } 
  } 
  val create1 = "4869215,bbbbb" 
  val create2 = "4869215,aaaaa" 
  val createsRDD = sparkContext.parallelize(Seq(create1,create2)).map(Test.from) 
}

I copy code from spark github and add some small changes to provide a SQLContext :

trait SharedSparkContext extends Specification with BeforeAfterAll { 
  import net.lizeo.bi.spark.conf.JobConfiguration._ 

  @transient private var _sql: SQLContext = _ 

  def sqlCtx: SQLContext = _sql 

  override def beforeAll() { 

    println(sparkConf) 

    _sql = new SQLContext(sparkContext) 

  } 

  override def afterAll() { 
    sparkContext.stop() 
    _sql =  null 

  } 
} 

Model Test is pretty simple :

case class Test(key:Int, value:String) 

  object Test { 
    def from(line:String):Test = { 
      val f = line.split(",") 
      Test(f(0).toInt,f(1)) 
    } 
  }

The job configuration object :

object JobConfiguration {
  val conf = ConfigFactory.load()

  val sparkName = conf.getString("spark.name")
  val sparkMaster = conf.getString("spark.master")

  lazy val sparkConf = new SparkConf()
    .setAppName(sparkName)
    .setMaster(sparkMaster)
    .set("spark.executor.memory",conf.getString("spark.executor.memory"))         
    .set("spark.io.compression.codec",conf.getString("spark.io.compression.codec"))

  val sparkContext = new SparkContext(sparkConf)  
}

I'm using Spark 1.3.0 with Spec2. The exact dependencies from my sbt project files are :

object Dependencies { 
  private val sparkVersion = "1.3.0" 
  private val clouderaVersion = "5.4.4" 

  private val sparkClouderaVersion = s"$sparkVersion-cdh$clouderaVersion" 

  val sparkCdhDependencies = Seq( 
    "org.apache.spark" %% "spark-core" % sparkClouderaVersion % "provided", 
    "org.apache.spark" %% "spark-sql" % sparkClouderaVersion % "provided" 
    ) 

} 

The test output is :

[info] TestDFSpec  
[info]  
[info] Test DF  should  
[error]   x pass equality  
[error]    '[[], []]'  
[error]  
[error]     is not equal to  
[error]  
[error]    List([Test(4869215,bbbbb)], [Test(4869215,aaaaa)]) (TestDFSpec.scala:17)  
[error] Actual:   [[], []]  [error] Expected: List([Test(4869215,bbbbb)], [Test(4869215,aaaaa)])

sqlCtx.sql("select * FROM test").collectAsList() return [[], []] 

Any help would be greatly appreciated. I didn't meet any problem testing with RDD I do want to migrate from RDD to Dataframe and be able to use Parquet directly from Spark to store files

Thanks in advance

Nonontb
  • 476
  • 4
  • 10

1 Answers1

2

The test pass with the following code :

class TestDFSpec extends SharedSparkContext  {
  import sqlCtx.implicits._
  "Test DF " should {
    "pass equality" in {
      val createDF = sqlCtx.createDataFrame(Seq(create1,create2).map(Test.from))
      createDF.registerTempTable("test")
      val result = sqlCtx.sql("select * FROM test").collect()
      result === Array(Test.from(create1),Test.from(create2)).map(Row.fromTuple)
    }
  }

  val create1 = "4869215,bbbbb"
  val create2 = "4869215,aaaaa"
}

The main difference is the way the DataFrame is created : from a Seq[Test] instead of RDD[Test]

I asked some explanation on spark mailing :http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-dataframe-td24240.html#none

Nonontb
  • 476
  • 4
  • 10