16

I am trying to process 4 directories of text files that keep growing every day. What I need to do is, if somebody is trying to search for an invoice number, I should give them the list of files which has it.

I was able to map and reduce the values in text files by loading them as RDD. But how can I obtain the file name and other file attributes?

Vipin Bhaskaran
  • 209
  • 1
  • 2
  • 5

7 Answers7

35

Since Spark 1.6 you can combine text data source and input_file_name function as follows:

Scala:

import org.apache.spark.sql.functions.input_file_name

val inputPath: String = ???

spark.read.text(inputPath)
  .select(input_file_name, $"value")
  .as[(String, String)] // Optionally convert to Dataset
  .rdd // or RDD

Python:

(Versions before 2.x are buggy and may not preserve names when converted to RDD):

from pyspark.sql.functions import input_file_name

(spark.read.text(input_path)
    .select(input_file_name(), "value"))
    .rdd)

This can be used with other input formats as well.

samthebest
  • 30,803
  • 25
  • 102
  • 142
zero323
  • 322,348
  • 103
  • 959
  • 935
  • For me this method does not work when used in Python. The field which should be input_file_name is filled when doing the first operation like a .take(10) but every subsequent operation like a map on the rows produces an empty string. In Scala it works though. Spark 1.6 – ludwigm Jul 05 '16 at 17:21
  • 1
    @ludwigm This works in PySpark only as long as you don't move data from JVM. – zero323 Jul 05 '16 at 17:29
  • @zero323, why can't I move the input_file_name from the JVM. I need to save the list of files? – rado Dec 22 '16 at 17:36
  • Exactly, for Dataset and DataFrames you can just add a column: `val content = sqlContext.read.text(inputPath).withColumn("filename", input_file_name)` – Kei-ven Jan 11 '17 at 01:19
  • Works also with streaming: `spark.readStream .option("sep", ",") .schema(someSchema) .option("header", "true") .csv("hdfs://path/") .withColumn("input_file_name", input_file_name)` – michal777 Jan 09 '19 at 20:55
5

You can try this if you are in pyspark:

    test = sc.wholeTextFiles("pathtofile")

you will get a resulting RDD with first element = filepath and second element = content

4

If your text files are small enough, you can use SparkContext.wholeTextFiles which returns an RDD of (filename,content).

Mike Park
  • 10,845
  • 2
  • 34
  • 50
3

If your text files are too large for SparkContext.wholeTextFiles, you would use a (simple) custom InputFormat and then call SparkContext.hadoopRDD

The InputFormat would need to return a tuple (filename, line) rather than line then you could filter using a predicate that looks at the content of the line, then unique it and collect the filenames.

From Spark, the code would look something like:

val ft = classOf[FileNamerInputFormat]
val kt = classOf[String]
val vt = classOf[String]

val hadoopConfig = new Configuration(sc.hadoopConfiguration)
sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig)
  .filter { case (f, l) => isInteresting(l) }
  .map { case (f, _) => f } 
  .distinct()
  .collect()
Alister Lee
  • 2,236
  • 18
  • 21
  • Could you expand on that? Maybe an example? How would that help once you are in a MapPartitionsRDD or another RDD that is not based on file reading? – Justin Pihony Apr 17 '15 at 01:40
  • @JustinPihony expanded answer a bit. I hope you don't ask me to show the InputFormat... :) – Alister Lee Apr 17 '15 at 07:23
  • 3
    Hi Alister-Thanks so much for your response..I have used the latter part of your solution and I could get the (file,line) without using a custom inputformat. Check out this link: http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ – Vipin Bhaskaran Apr 20 '15 at 17:56
  • 1
    @VipinBhaskaran Note that you are using a function marked as Developer API, and it is therefore not stable. Just something to keep in mind – Justin Pihony Apr 21 '15 at 03:35
  • Great googling and thanks for pointing that out! I agree with @JustinPihony though - you're carrying some risk with that sweet, sweet API call. – Alister Lee Apr 21 '15 at 05:25
  • @Justin. I created a custom input format to give the file name and used newAPIHadoopFile , I needed something similar to get me delta between two files.checkout my github for the code : https://github.com/animeshj/bigdata – Animesh Raj Jha May 08 '17 at 19:16
3

You can use WholeTextFile() to achieve this. However if the input files are big then it would be counter productive to use WholeTextFile() since it put whole file content into a single record.

The best way to retrieve files names in such scenario is to use mapPartitionsWithInputSplit(). You can find a working example using this scenario on my blog.

g00glen00b
  • 41,995
  • 13
  • 95
  • 133
BJC
  • 491
  • 3
  • 21
  • Added more details, I hope it satiates.The code is fairly big and best would be to pull if off from the blog ! – BJC Jun 27 '17 at 08:05
  • This looks better already, I've deleted my previous comment to avoid confusion in the future. – g00glen00b Jun 27 '17 at 08:10
2

If you're using dataframe API, you can get files names from HDFS using input_file_name function from org.apache.spark.sql.functions. Below snippets might help you understand.

val df = spark.read.csv("/files/")
val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType)) 
val df3 = df.withColumn("file_name", input_file_name()) 

df2 now includes new field called "file_name" that contains HDFS filename extracted using split function. If you need full HDFS path, you can get using input_file_name() function only as shown in df3.

Piyush Patel
  • 1,646
  • 1
  • 14
  • 26
1

It seems overkill to use Spark directly ... If this data is going to be 'collected' to the driver, why not use the HDFS API? Often Hadoop is bundled with Spark. Here is an example:

import org.apache.hadoop.fs._
import org.apache.hadoop.conf._

val fileSpec = "/data/Invoices/20171123/21"
val conf = new Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf)
val path = new Path(fileSpec)
// if(fs.exists(path) && fs.isDirectory(path) == true) ...
val fileList = fs.listStatus(path)

Then with println(fileList(0)), info (formatted) like this first item (as an example) can be seen as org.apache.hadoop.fs.FileStatus:

FileStatus {
    path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; 
    isDirectory=false; 
    length=29665563;
    replication=3;
    blocksize=134217728;
    modification_time=1511810355666;
    access_time=1511838291440;
    owner=codeaperature;
    group=supergroup;
    permission=rw-r--r--;
    isSymlink=false
}

Where fileList(0).getPath will give hdfs://nameNodeEneteredHere/Invoices-0001.avro.

I guess this means of reading files would primarily be with the HDFS namenode and not within each executor. TLDR; I'm betting Spark would likely poll the namenode to get RDDs. If the underlying Spark call polls the namenode to manage the RDDs, perhaps the above is an efficient solution. Still, contributive comments suggesting either direction would be welcome.

codeaperature
  • 1,089
  • 2
  • 10
  • 25