0

I have customer_input_data.tar.gz in HDFS, which have 10 different tables data in csv file format. so i need to unzip this file to /my/output/path using spark scala

please suggest how to unzip customer_input_data.tar.gz file using spark scala

NS Saravanan
  • 303
  • 6
  • 17
  • Try to follow this link, it could give you some answers: https://stackoverflow.com/questions/36604145/read-whole-text-files-from-a-compression-in-spark – Chema Jun 18 '20 at 09:47
  • thanks for sending useful link. working on my won solution. will update – NS Saravanan Jun 22 '20 at 12:57

2 Answers2

1

gzip is not a splittable format in Hadoop. Consequently, the file is not really going to be distributed across the cluster and you don't get any benefit of distributed compute/processing in hadoop or Spark.

Better approach may be to,

  • uncompress the file on the OS and then individually send the files back to hadoop.

If you still want to uncompress in scala, you can simply resort to java class GZIPInputStream via

new GZIPInputStream(new FileInputStream("your file path"))
ameet chaubal
  • 1,440
  • 16
  • 37
0

I developed the below code for decompress the files using scala. You need to pass input path and output path and Hadoopfile system

    /*below method used for processing zip files*/
  @throws[IOException]
  private def processTargz(fullpath: String, houtPath: String, fs: FileSystem): Unit = {
    val path = new Path(fullpath)
    val gzipIn = new GzipCompressorInputStream(fs.open(path))
    try {
      val tarIn = new TarArchiveInputStream(gzipIn)
      try {
        var entry:TarArchiveEntry = null
        out.println("Tar entry")
        out.println("Tar Name entry :" + FilenameUtils.getName(fullpath))
        val fileName1 = FilenameUtils.getName(fullpath)
        val tarNamesFolder = fileName1.substring(0, fileName1.indexOf('.'))
        out.println("Folder Name : " + tarNamesFolder)
        while ( {
          (entry = tarIn.getNextEntry.asInstanceOf[TarArchiveEntry]) != null
        }) { // entity Name as tsv file name which are part of inside compressed tar file
          out.println("ENTITY NAME : " + entry.getName)

          /** If the entry is a directory, create the directory. **/
          out.println("While")
          if (entry.isDirectory) {
            val f = new File(entry.getName)
            val created = f.mkdir
            out.println("mkdir")
            if (!created) {
              out.printf("Unable to create directory '%s', during extraction of archive contents.%n", f.getAbsolutePath)
              out.println("Absolute path")
            }
          }
          else {
            var count = 0
            val slash = "/"
            val targetPath = houtPath + slash + tarNamesFolder + slash + entry.getName
            val hdfswritepath = new Path(targetPath)
            val fos = fs.create(hdfswritepath, true)
            try {
              val dest = new BufferedOutputStream(fos, BUFFER_SIZE)
              try {
                val data = new Array[Byte](BUFFER_SIZE)
                while ( {
                  (count = tarIn.read(data, 0, BUFFER_SIZE)) != -1
                }) dest.write(data, 0, count)
              } finally if (dest != null) dest.close()
            }
          }
        }
        out.println("Untar completed successfully!")
      } catch {
        case e: IOException =>
          out.println("catch Block")
      } finally {
        out.println("FINAL Block")
        if (tarIn != null) tarIn.close()
      }
    }
  }
NS Saravanan
  • 303
  • 6
  • 17