0

i have the following scala code to pull data from Spark:

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.spark.sql.functions._

    val emailDF = loadTable("email")
              .where(s"topic = '${Topics.Email}'")
              .cache()

    val df = emailDF.withColumn("rank",row_number()
              .over(Window.partitionBy("email_address")
                          .orderBy(desc("created_at"))))

    val resultDf = df.filter(s"rank == 1").drop("rank")

when ran the code I got this error:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

searched around to find out that I need to add hive dependency, and here is my updated dependencies:

    build.sbt
    val sparkVersion = "1.6.3" 
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
      "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
    )

however I still got the same error.

tried the hiveContext approach:

        val emailDF = Email.load()
          .filter(col(Email.TopicId).isin(Topics.Email))
          .filter(col(Email.OptIn).isin(optInFlag))
          .cache()

        val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
        logger.info(s"sc: ${sc.appName}, ${sc.sparkUser}")
        emailDF.registerTempTable("emailDFTable")

        val df = hiveContext.sql("""SELECT *,
                                    row_number() over(partition by email_address order by event_at desc) AS rank
                             FROM emailDFTable""")

        val resultDf = df.filter(s"rank == 1").drop("rank")

now I got the error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: emailDFTable; line 3 pos 30
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)

another approach i tried:

val windowSpec = Window.partitionBy(col(EmailChannel.EmailAddress)).orderBy(col(EmailChannel.EventAt).desc)
    val resultDf = emailDF.withColumn("maxEventAt", first("event_at").over(windowSpec))
      .select("*").where(col("maxEventAt") === col(EmailChannel.EventAt))
      .drop("maxEventAt")

then got similar error again:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'first_value'. Note that, using window functions currently requires a HiveContext;

i really don't understand I have import hiveContext and added spark-hive dependency, why it won't work. one thing i can think of is we use datastax spark, so we have the following denpendencies in build.sbt

  "com.datastax.spark"  %% "spark-cassandra-connector" % "1.6.11",

do I need a datastax.spark.hive too? but don't see such a lib exists.

also i display my emailDF: emailDF.show(false) it has lots of data in it, not empty.

==== Update ====

yes, switch to HiveContext works, i didn't notice there are SparkContext and SQLContext initialized at the beginning of the code, instead of switch SQLContext with HiveContext, i tried to create a new HiveContext out of SparkContext:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

that's why it didn't work. After i changed the SQLContext to HiveContext it works fine.

changed from

  implicit val sc: SparkContext       = new SparkContext(sparkConfig)
  implicit val sqlContext: SQLContext = new SQLContext(sc)

to

 implicit val sc: SparkContext        = new SparkContext(sparkConfig)
 implicit val sqlContext: HiveContext = new HiveContext(sc)
user468587
  • 4,799
  • 24
  • 67
  • 124

1 Answers1

2

in Spark 1.6 Windowing function are only available with HiveContext.

Create hiveContext using sparkContext(sc).

import org.apache.spark.sql.hive.HiveContext

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Register dataframe as temp table and run query on temp table using hiveContext.

emailDF.registerTempTable("emailDFTable")

Once dataframe is register as temp table, please check your temp table.

hiveContext.sql("SHOW tables").show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|        |emaildftable|       true|
+--------+------------+-----------+

Now you can query your temp table.

val df = hiveContext.sql("""SELECT *,
                                row_number() over(partition by email_address order by created_at desc) AS rank
                         FROM emailDFTable""")

Let me know how it goes.

Shantanu Sharma
  • 3,661
  • 1
  • 18
  • 39
  • when i added the line 'val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)' it complained it cannot resolve 'org.apache.spark.sql.hive' even though i imported hive: 'import hiveContext._', another thing i noticed is in the 'External Libraries' in my IDE, no org.apache.spark:spark-hive listed there, even after i 'sbt assembly' or 'sbt validate', seems that library was not downloaded to my project, why is that? – user468587 Apr 11 '19 at 16:12
  • try importing "import org.apache.spark.sql.hive.HiveContext" and ensure that you have a valid sc - SparkContext – Shantanu Sharma Apr 11 '19 at 16:27
  • now i got this error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: emailDFTable, and i printed out the emailDF, there are data in it. – user468587 Apr 11 '19 at 22:09
  • Dataframe need to be registered as temp table before running a query on table. Did you do that? Check the name you using to register and query has..Both should be identical. – Shantanu Sharma Apr 11 '19 at 22:21
  • updated my question, yes, I registered as temp table before running the query on table. and the temp table name is the same as the one in query. – user468587 Apr 12 '19 at 00:52
  • I have updated my answer. once dataframe is register as temp table then check if temp table exists. – Shantanu Sharma Apr 12 '19 at 09:01
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/191716/discussion-between-xengineer-and-user468587). – Shantanu Sharma Apr 12 '19 at 10:12