3

I am after a pyspark(python) script to list all the files in Azure blob storage(including subdirectories). I found a script for this purpose in scala, need help in converting this script to pyspark.

https://learn.microsoft.com/en-us/azure/databricks/kb/data/list-delete-files-faster#list-files Scale code


    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{Path, FileSystem}
    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
    import java.net.URI

    def listFiles(basep: String, globp: String): Seq[String] = {
      val conf = new Configuration(sc.hadoopConfiguration)
      val fs = FileSystem.get(new URI(basep), conf)

      def validated(path: String): Path = {
        if(path startsWith "/") new Path(path)
        else new Path("/" + path)
      }

      val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
        paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
        hadoopConf = conf,
        filter = null,
        sparkSession = spark)

      fileCatalog.flatMap(_._2.map(_.path))
    }

    val root = "/mnt/path/table"
    val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

    val files = listFiles(root, globp)
    files.toDF("path").show()

I have managed to convert the code to pyspark but I am getting the below error. 'JavaMember' object has no attribute 'globPath'


    configuration = sc._jvm.org.apache.hadoop.conf
    fspath = sc._jvm.org.apache.hadoop.fs
    hadooputil = sc._jvm.org.apache.spark.deploy.SparkHadoopUtil
    inmemfileindex = sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex
    javauri = sc._jvm.java.net.URI

    rootURL = "/mnt/"
    globp = "[^_]*" #glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

    conf = sc._jvm.org.apache.hadoop.conf.Configuration(sc._jsc.hadoopConfiguration())
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jvm.java.net.URI(rootURL),conf)


    g=sc._jvm.org.apache.hadoop.fs.Path.mergePaths(sc._jvm.org.apache.hadoop.fs.Path(rootURL),  sc._jvm.org.apache.hadoop.fs.Path("/" + globp))

    hadooputil.get.globPath(fs,g)

Any help is much appreciated.

ANS28
  • 61
  • 5
  • 1
    use `globStatus` which is part of `hadoop.fs` e.g. `files = fspath.globStatus(Path('/service=webapp/date=2019-03-31/*log4j*'))` [Source](https://stackoverflow.com/a/40258750/1178971) – forgetso Mar 03 '21 at 11:40
  • @forgetso I have added an answer based on your comment here: https://stackoverflow.com/a/67050173/143447, thank you! – Lou Zell Apr 12 '21 at 16:48

0 Answers0