-2

Imagine that we have a directory structure/partitioning of the data as:

/foo/day=1/lots/of/other/stuff/
/foo/day=2/lots/of/other/stuff/
/foo/day=3/lots/of/other/stuff/
.
.
/foo/day=25/lots/of/other/stuff/

I want to read only data of the highest increment of day, here /foo/day=25/lots/of/other/stuff/.

If day is a column in the data we can do something like:

spark.read.parquet("s3a://foo/day=*/")
   .withColumn("latestDay",max(col("day")).over())
   .filter(col("day")===col("latestDay"))

Can you propose something smarter assuming that day is not a column?

Data wasn't written using write.partitionBy("day") or similar. In my case schema in the subpaths aren't even necessarily meaningfully coherent.

Maybe there's a path glob pattern to do this or similar? Or is it performance-wise equivalent to define the day column and hope for predicate-push or similar optimisations?

ragulpr
  • 357
  • 2
  • 13

1 Answers1

1
      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.fs.{FileSystem, Path}
      import scala.annotation.tailrec

      // replace this with your file system
      val fs: FileSystem = FileSystem.get(new Configuration())

      /**
        * Returns the latest partition (folder) contained in the specified path.
        * The latest partition is defined by the name (alfanumerical order). Set the proper flat to enable the check by modification timestamp.
        *
        * @param path      the HDFS folder where to start looking for the latest partition
        * @param recursive if true, returns the latest updated folder in the folder tree with @param(path) as root
        * @param useModificationTimestamp to use the latest added partition
        * @return String path of the latest partition
        */
      def getLatestPartition(path: String, recursive: Boolean = false,
                             useModificationTimestamp: Boolean = false): String = {

        if (recursive) {
          this.getLatestPartitionRecursive(new Path(path), useModificationTimestamp).toString
        } else {
          this.getLatestPartition(new Path(path), useModificationTimestamp).toString
        }
      }

      @tailrec
      private def getLatestPartitionRecursive(path: Path, useModificationTimestamp: Boolean): Path = {
        if (fs.listStatus(path).forall(!_.isDirectory)) {
          path
        } else {
          this.getLatestPartitionRecursive(getLatestPartition(path, useModificationTimestamp), useModificationTimestamp)
        }
      }

      private def getLatestPartition(path: Path, useModificationTimestamp: Boolean): Path = {
        if (fs.listStatus(path).forall(!_.isDirectory))
          path
        else {
          if (useModificationTimestamp)
            fs.listStatus(path).filter(_.isDirectory).maxBy(_.getModificationTime).getPath
          else
            fs.listStatus(path).filter(_.isDirectory).maxBy(_.getPath.getName).getPath
        }
      }

Usage:

    val latest = getLatestPartition("s3a://foo/")
    spark.read.parquet(latest)

To get the latest day, parse it from latest.

meniluca
  • 186
  • 7