7

The purpose of this is in order to manipulate and save a copy of each data file in a second location in HDFS. I will be using

RddName.coalesce(1).saveAsTextFile(pathName)

to save the result to HDFS.

This is why I want to do each file separately even though I am sure the performance will not be as efficient. However, I have yet to determine how to store the list of CSV file paths into an array of strings and then loop through each one with a separate RDD.

Let us use the following anonymous example as the HDFS source locations:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

I know how to list the file paths using Hadoop FS Shell:

HDFS DFS -ls /data/email/click/*/*.csv

I know how to create one RDD for all the data:

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )
Anju
  • 631
  • 2
  • 9
  • 25
Jaime
  • 154
  • 1
  • 8

3 Answers3

10

I haven't tested it thoroughly but something like this seems to work:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
import java.net.URI

val path: String = ???

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val iter = hdfs.listFiles(new Path(path), false)

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
  def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
    if (iter.hasNext) {
      val uri = iter.next.getPath.toUri
      go(iter, uri :: acc)
    } else {
      acc
    }
  }
  go(iter, List.empty[java.net.URI])
}

listFiles(iter).filter(_.toString.endsWith(".csv"))
zero323
  • 322,348
  • 103
  • 959
  • 935
1

This is what ultimately worked for me:

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")

hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
   val filePathName = fileStatus.getPath().toString()
   val fileName = fileStatus.getPath().getName()

   // < DO STUFF HERE>

} // end foreach loop
Jaime
  • 154
  • 1
  • 8
-1

sc.wholeTextFiles(path) should help. It gives an rdd of (filepath, filecontent).

Huy Banh
  • 29
  • 4
  • 3
    Wouldn't that be using the data though? I just want to iterate through each filepath within. – Jaime Sep 25 '15 at 15:23