Im using spark and i've been struggling to make a simple unit test pass with a Dataframe and Spark SQL.
Here is the snippet code :
class TestDFSpec extends SharedSparkContext {
"Test DF " should {
"pass equality" in {
val createDF = sqlCtx.createDataFrame(createsRDD,classOf[Test]).toDF()
createDF.registerTempTable("test")
sqlCtx.sql("select * FROM test").collectAsList() === List(Row(Test.from(create1)),Row(Test.from(create2)))
}
}
val create1 = "4869215,bbbbb"
val create2 = "4869215,aaaaa"
val createsRDD = sparkContext.parallelize(Seq(create1,create2)).map(Test.from)
}
I copy code from spark github and add some small changes to provide a SQLContext :
trait SharedSparkContext extends Specification with BeforeAfterAll {
import net.lizeo.bi.spark.conf.JobConfiguration._
@transient private var _sql: SQLContext = _
def sqlCtx: SQLContext = _sql
override def beforeAll() {
println(sparkConf)
_sql = new SQLContext(sparkContext)
}
override def afterAll() {
sparkContext.stop()
_sql = null
}
}
Model Test is pretty simple :
case class Test(key:Int, value:String)
object Test {
def from(line:String):Test = {
val f = line.split(",")
Test(f(0).toInt,f(1))
}
}
The job configuration object :
object JobConfiguration {
val conf = ConfigFactory.load()
val sparkName = conf.getString("spark.name")
val sparkMaster = conf.getString("spark.master")
lazy val sparkConf = new SparkConf()
.setAppName(sparkName)
.setMaster(sparkMaster)
.set("spark.executor.memory",conf.getString("spark.executor.memory"))
.set("spark.io.compression.codec",conf.getString("spark.io.compression.codec"))
val sparkContext = new SparkContext(sparkConf)
}
I'm using Spark 1.3.0 with Spec2. The exact dependencies from my sbt project files are :
object Dependencies {
private val sparkVersion = "1.3.0"
private val clouderaVersion = "5.4.4"
private val sparkClouderaVersion = s"$sparkVersion-cdh$clouderaVersion"
val sparkCdhDependencies = Seq(
"org.apache.spark" %% "spark-core" % sparkClouderaVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkClouderaVersion % "provided"
)
}
The test output is :
[info] TestDFSpec
[info]
[info] Test DF should
[error] x pass equality
[error] '[[], []]'
[error]
[error] is not equal to
[error]
[error] List([Test(4869215,bbbbb)], [Test(4869215,aaaaa)]) (TestDFSpec.scala:17)
[error] Actual: [[], []] [error] Expected: List([Test(4869215,bbbbb)], [Test(4869215,aaaaa)])
sqlCtx.sql("select * FROM test").collectAsList() return [[], []]
Any help would be greatly appreciated. I didn't meet any problem testing with RDD I do want to migrate from RDD to Dataframe and be able to use Parquet directly from Spark to store files
Thanks in advance