5

I have a hive table which is partitioned by 'date' field i want to write a query to get the data from latest(max) partition.

spark.sql("select field from table  where date_of = '2019-06-23'").explain(True)
vs 
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)

Below are the Physical plan of the two query

*(1) Project [qbo_company_id#120L]
        +- *(1) FileScan parquet 
    table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
   :  +- Subquery subquery0
   :     +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
   :              +- LocalTableScan [date_of#76]
   +- *(1) FileScan parquet 
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

Why is the subquery scanning the whole partition instead of choosing the latest one? With the help of metadata about partitions, why can it not scan only the required partition?

Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
SelvamR
  • 71
  • 1
  • 6
  • Hi @Selvam what is the type of date_of? max function will work if that is a date or timestamp field – abiratsis Jul 12 '19 at 10:18
  • based on the good discussion going here, I decode it for pyspark. Just in case if some one wants to refer a solution in Pyspark. https://stackoverflow.com/questions/55053218/pyspark-getting-latest-partition-from-hive-partitioned-column-logic/57440760#57440760 – vikrant rana Aug 10 '19 at 13:12

2 Answers2

2

Building on Ram's answer, there is a much simpler way to accomplish this that eliminates a lot of overhead by querying the Hive metastore directly, rather than executing a Spark-SQL query. No need to reinvent the wheel:

import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient

val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
val maxPart = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(",")).max
Charlie Flowers
  • 1,287
  • 7
  • 12
1

If I were you... I'd prefer different approach rather than sql query and full table scan.

spark.sql(s"show partitions $tablename")

Then, I will convert that in to Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime] which has joda date columns

/**
    * listMyHivePartitions - lists hive partitions as sequence of map
    * @param tableName String
    * @param spark SparkSession
    * @return Seq[Map[String, DateTime]]
    */
  def listMyHivePartitions(tableName :String,spark:SparkSession) : Seq[Map[String, DateTime]]  = {
    println(s"Listing the keys from ${tableName}")
    val partitions: Seq[String] = spark.sql(s"show partitions ${tableName}").collect().map(row => {
      println(s" Identified Key: ${row.toString()}")
      row.getString(0)
    }).toSeq
    println(s"Fetched ${partitions.size}  partitons from ${tableName}")
    partitions.map(key => key.split("/").toSeq.map(keyVal => {
      val keyValSplit = keyVal.split("=")
      (keyValSplit(0).toLowerCase().trim, new DateTime(keyValSplit(1).trim))
    }).toMap)
  }

and will apply

getRecentPartitionDate like below

/**
    * getRecentPartitionDate.
    *
    * @param column   String
    * @param seqOfMap { @see Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]}
    **/
  def getRecentPartitionDate(column: String, seqOfMap: Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]): Option[Map[String, DateTime]] = {
    logger.info(" >>>>> column " + column)
    val mapWithMostRecentBusinessDate = seqOfMap.sortWith(
      (a, b) => {
        logger.debug(a(column).toString() + " col2" + b(column).toString())
        a(column).isAfter(b(column))
      }
    )

    logger.debug(s" mapWithMostRecentBusinessDate: $mapWithMostRecentBusinessDate , \n Head = ${mapWithMostRecentBusinessDate.headOption} ")

    mapWithMostRecentBusinessDate.headOption
  }

Advantage is no sqls no full table scans...

The above can be also applied when you query from hivemetastore which is database at backend will file show paritions table on that, result of the query is java.sql.ResultSet

 /**
        * showParts.
        *
        * @param table
        * @param config
        * @param stmt
        */
      def showParts(table: String, config: Config, stmt: Statement): Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]] = {
        val showPartitionsCmd = " show partitions " + table;
        logger.info("showPartitionsCmd " + showPartitionsCmd)
        try {
          val resultSet = stmt.executeQuery(showPartitionsCmd)

          // checkData(resultSet)
          val result = resultToSeq(resultSet);
          logger.info(s"partitions of $table ->" + showPartitionsCmd + table);
          logger.debug("result " + result)

          result
        }
        catch {
          case e: Exception => logger.error(s"Exception occurred while show partitions table $table..", e)
            null
        }
      }

      /** *
        * resultToSeq.
        *
        * @param queryResult
        */
      def resultToSeq(queryResult: ResultSet) = {
        val md = queryResult.getMetaData

        val colNames = for (i <- 1 to md.getColumnCount) yield md.getColumnName(i)
        var rows = Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]()
        while (queryResult.next()) {
          var row = scala.collection.immutable.Map.empty[String, DateTime]
          for (n <- colNames) {
            val str = queryResult.getString(n).split("=")

            //str.foreach(logger.info)
            import org.joda.time.format.DateTimeFormat
            val format = DateTimeFormat.forPattern("yyyy-MM-dd")
            row += str(0) -> DateTime.parse(str(1)) //.toString(DateTimeFormat.shortDate())
            logger.debug(row.toString())
          }
          rows = rows :+ row
        }

        rows
      }

after getting seq of map I will apply def in top i.e. getRecentPartitionDate

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Instead of this I can do spark.sql("select max(field) from table").first()/collect. Then I can pass this value in the string query. I am not asking how to improve the query performance rather why subquery do full scan. – SelvamR Jul 12 '19 at 06:17
  • subquery do full scan on the table in this case. in normal sql case as well – Ram Ghadiyaram Jul 12 '19 at 15:03
  • dont use query for just getting latest partition. is gist of what I am saying... in above approach show partitions command is used no query or subquery and complicate stuff with perfornance issues is what I am saying... Hope you will understand it correct. – Ram Ghadiyaram Jul 12 '19 at 15:06
  • @Ram Ghadiyaram-- Could you please post pyspark version as well? – vikrant rana Jul 13 '19 at 12:54
  • @RamGhadiyaram yes, you are right. select max(partition_field) will go through all partition to decide the result(Though it faster than using this query in subquery) whereas "show partitions" will get the metadata information from metastore. Here is my question, why spark use this logic(show partition) to find max result for partition field. – SelvamR Jul 15 '19 at 05:35
  • @RamGhadiyaram how about the below approach. `cl = spark.sql("show partitions $tablename").collect() cl = cl[-1] max_part = cl.partition` – SelvamR Jul 15 '19 at 06:00
  • 1
    if you have tried yes we can but thing is we dont have lowest control in the time formats and so on. thats the reason in one of the project I have done this way which is more controllable. again orderBy is dataframe level sort it may shuffle. what I was doing is simple scala function level sort... so these are differences. I think I could be able to provote thoughts using show partitions... is the gist of this answer. remaining all based on your comfort and requirement. I have mentioned 2 power ful, yet simple ways to achieve the goal. – Ram Ghadiyaram Jul 15 '19 at 06:02
  • @RamGhadiyaram if you are planning to collect and find last/sort and first. can not we use the below statement `>>> spark.sql("show partitions $table_name").orderBy("partition",ascending=False).first() +------------------+ | partition| +------------------+ |date_of=2019-06-30|` Note: The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle. – SelvamR Jul 15 '19 at 06:02
  • for your latest comment see my comment above – Ram Ghadiyaram Jul 15 '19 at 06:06
  • @RamGhadiyaram yes, I agree the approach that it will improve the performance. But the actual question is, why spark is not intelligent to identify this is the partition column and can get the max value from "show partition" instead of scan the whole partition? Any idea/PR related to this full scan? – SelvamR Jul 15 '19 at 06:07
  • 1
    AFAIK it will do full scan its not intelligent about underlying data source and optimizations in this case. thats the reason full scan is happening – Ram Ghadiyaram Jul 15 '19 at 06:09
  • "actual question is, why spark is not intelligent to identify this is the partition column and can get the max value from "show partition" instead of scan the whole partition?" raise this in dev forums of apache spark. – Ram Ghadiyaram Aug 06 '19 at 14:06