24

I need to process multiple files scattered across various directories. I would like to load all these up in a single RDD and then perform map/reduce on it. I see that SparkContext is able to load multiple files from a single directory using wildcards. I am not sure how to load up files from multiple folders.

The following code snippet fails:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

This fails on the third loop with the following error message:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

Which is bizarre given I am providing only 2 arguments. Any pointers appreciated.

Mike Müller
  • 82,630
  • 20
  • 166
  • 161
Raj
  • 2,852
  • 4
  • 29
  • 48
  • 2
    ..but the first argument is `self`. From the [docs](http://spark.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#union), you need `sc.union([retval,lines])` – Jonathan Dursi Apr 30 '14 at 21:11
  • Let me try that. I am surprised why this would work for 2 loops and fail on third ... – Raj Apr 30 '14 at 22:23
  • I just realized you can use `sc.textFile(','.join(files))` to read them in a single go. – Daniel Darabos Jun 16 '14 at 18:01

4 Answers4

44

How about this phrasing instead?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

In Scala SparkContext.union() has two variants, one that takes vararg arguments, and one that takes a list. Only the second one exists in Python (since Python does not have polymorphism).

UPDATE

You can use a single textFile call to read multiple files.

sc.textFile(','.join(files))
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Thanks Daniel. My issue may be Python centric. Your snippet seems Scala, – Raj May 01 '14 at 22:24
  • Ah, why didn't I realize that?! There is no function polymorphism in Python, so only one form of SparkContext.union() can be exposed. They chose to expose the one that takes a list, not the one taking a vararg. (Like Jonathan says.) – Daniel Darabos May 02 '14 at 11:23
  • I fixed the answer to have Python instead of Scala. – Daniel Darabos May 02 '14 at 19:10
  • 1
    There's a typo in this answer but I can't edit it because it isn't 6 characters long: The `"\n"` should be a `"/"`. – Noah Mar 16 '17 at 16:16
  • Thanks! Strange mistake to make... `\n` and `/` are nowhere near each other on the keyboard :). – Daniel Darabos Mar 17 '17 at 17:20
13

I solve similar problems by using wildcard.

e.g. I found some traits in the files I want to load in spark,

dir

subdir1/folder1/x.txt

subdir2/folder2/y.txt

you can use the following sentence

sc.textFile("dir/*/*/*.txt")

to load all relative files.

The wildcard '*' only works in single level directory, which is not recursive.

fibonacci
  • 2,254
  • 2
  • 17
  • 13
3

You can use the following function of SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

Neil
  • 7,482
  • 6
  • 50
  • 56
1

You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D

Murtaza Kanchwala
  • 2,425
  • 25
  • 33