i have the following scala code to pull data from Spark:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions._
val emailDF = loadTable("email")
.where(s"topic = '${Topics.Email}'")
.cache()
val df = emailDF.withColumn("rank",row_number()
.over(Window.partitionBy("email_address")
.orderBy(desc("created_at"))))
val resultDf = df.filter(s"rank == 1").drop("rank")
when ran the code I got this error:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
searched around to find out that I need to add hive dependency, and here is my updated dependencies:
build.sbt
val sparkVersion = "1.6.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
)
however I still got the same error.
tried the hiveContext approach:
val emailDF = Email.load()
.filter(col(Email.TopicId).isin(Topics.Email))
.filter(col(Email.OptIn).isin(optInFlag))
.cache()
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
logger.info(s"sc: ${sc.appName}, ${sc.sparkUser}")
emailDF.registerTempTable("emailDFTable")
val df = hiveContext.sql("""SELECT *,
row_number() over(partition by email_address order by event_at desc) AS rank
FROM emailDFTable""")
val resultDf = df.filter(s"rank == 1").drop("rank")
now I got the error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: emailDFTable; line 3 pos 30
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)
another approach i tried:
val windowSpec = Window.partitionBy(col(EmailChannel.EmailAddress)).orderBy(col(EmailChannel.EventAt).desc)
val resultDf = emailDF.withColumn("maxEventAt", first("event_at").over(windowSpec))
.select("*").where(col("maxEventAt") === col(EmailChannel.EventAt))
.drop("maxEventAt")
then got similar error again:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'first_value'. Note that, using window functions currently requires a HiveContext;
i really don't understand I have import hiveContext and added spark-hive dependency, why it won't work. one thing i can think of is we use datastax spark, so we have the following denpendencies in build.sbt
"com.datastax.spark" %% "spark-cassandra-connector" % "1.6.11",
do I need a datastax.spark.hive too? but don't see such a lib exists.
also i display my emailDF: emailDF.show(false) it has lots of data in it, not empty.
==== Update ====
yes, switch to HiveContext works, i didn't notice there are SparkContext and SQLContext initialized at the beginning of the code, instead of switch SQLContext with HiveContext, i tried to create a new HiveContext out of SparkContext:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
that's why it didn't work. After i changed the SQLContext to HiveContext it works fine.
changed from
implicit val sc: SparkContext = new SparkContext(sparkConfig)
implicit val sqlContext: SQLContext = new SQLContext(sc)
to
implicit val sc: SparkContext = new SparkContext(sparkConfig)
implicit val sqlContext: HiveContext = new HiveContext(sc)