74

Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?

cs95
  • 379,657
  • 97
  • 704
  • 746
Hafiz Mujadid
  • 1,565
  • 1
  • 15
  • 27

14 Answers14

103
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header
Community
  • 1
  • 1
Jimmy
  • 2,165
  • 1
  • 17
  • 13
  • 6
    The question asks about how to skip headers in a csv file,If headers are ever present they will be present in the first row. – Jimmy Nov 05 '15 at 10:24
  • 3
    This is not always true. If you write out a csv with Spark there could be multiple files, each with their own header. Using this as input to another Spark program will give you multiple headers. Also, you can feed in multiple files at one with Spark. – Sal Dec 07 '16 at 23:45
  • intuitive approach – jack AKA karthik Oct 03 '17 at 15:51
  • error: recursive value data needs type. Change last line to dataFiltered = data.filter(row => row != header) – Amit Sadafule Jun 27 '18 at 20:23
  • 1
    Does this solution scan the entire rdd and does a check on every row just to pop the header at the very top? Is this really the most efficient way? – iLikeKFC Dec 02 '20 at 17:56
74

If there were just one header line in the first record, then the most efficient way to filter it out would be:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

Python equivalent:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Sean Owen
  • 66,182
  • 23
  • 141
  • 173
  • 4
    The filter method would still be more efficient than the `zipWithIndex` approach proposed on the other answer. – maasg Jan 09 '15 at 10:06
  • no there is not only a sinle line there may be a line for each file. – Hafiz Mujadid Jan 09 '15 at 10:44
  • Yes i mean that you could make an RDD for each file and strip its single header this way, then union. – Sean Owen Jan 09 '15 at 11:16
  • missing and drop(n) method here – Julio Apr 01 '15 at 19:41
  • 1
    `rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }` How can you say that if the index value is 0 then it will be a header? It won't help, it can be a header or other value of CSV or header with the value – Shubham Agrawal Jun 23 '18 at 12:52
  • I agree with @shubhamAgrawal .. you're not sure that the index 0 refers to the first line. the data is already partionned and you have no idea if the header is the first line of the partition or not. – mahmoud mehdi Jan 14 '19 at 16:28
  • The first partition will contain the first part of the file; that much definitely happens. It wouldn't necessarily if it had been repartitioned with a shuffle. This is definitely a fragile approach and depends heavily on what files have what header. In the 4 years since that answer, Spark has much much better ways of reading CSV that will handle headers, infer schema, etc. – Sean Owen Jan 14 '19 at 18:35
  • Does this solution scan the entire rdd and does a check on every row just to pop the header at the very top? Is this really the most efficient way? – iLikeKFC Dec 02 '20 at 17:56
63

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

spark.read.option("header","true").csv("filePath")
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Sandeep Purohit
  • 3,652
  • 18
  • 22
  • Are you sure this works from 2.0 onwards? I'm using v2.0.1 and getting "AttributeError: 'SparkContext' object has no attribute 'read'". – ciri Oct 23 '16 at 17:52
  • 11
    @ciri spark is not a SparkContext object its SparkSession object so if you wanna use csv reader you need SparkSession object – Sandeep Purohit Oct 23 '16 at 18:36
14

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

Shivansh
  • 3,454
  • 23
  • 46
7

In PySpark you can use a dataframe and set header as True:

df = spark.read.csv(dataPath, header=True)
hayj
  • 1,159
  • 13
  • 21
7

Working in 2018 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")

Scala

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

UPDATE 2021 The same code works for Spark 3.x

df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("csv")
    .csv("mycsv.csv")
Antonio Cachuan
  • 475
  • 1
  • 9
  • 22
6

You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0) and then union all the file RDDs.

If the number of files is too large, the union could throw a StackOverflowExeption.

Shepard
  • 1,111
  • 5
  • 16
  • 32
pzecevic
  • 2,807
  • 22
  • 21
4

Use the filter() method in PySpark by filtering out the first column name to remove the header:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
  • How is this any different from [this given answer](http://stackoverflow.com/a/31202898/2308683)? Your answer would require you to know the first column name ahead of time. – OneCricketeer Aug 18 '16 at 23:18
  • @cricket_007 coz this will filter out multiple header columns as pointed by other users. – Abdul Mannan Jun 30 '17 at 17:10
1

Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Adrian Bridgett
  • 191
  • 1
  • 2
  • 8
0

It's an option that you pass to the read() command:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")
Paul Roub
  • 36,322
  • 27
  • 84
  • 93
Sahan Jayasumana
  • 440
  • 6
  • 10
0

You can simply filter out the Header row by simply using filter() action in Pycharm(in case using python)

rdd = sc.textFile('StudentData.csv')
headers=rdd.first()
rdd=rdd.filter(lambda x: x!=headers) 
rdd.collect()
S.B
  • 13,077
  • 10
  • 22
  • 49
0

Steps to filter header from datasets in RDD in Spark

def filter_header(line):
if line[0] != 'header_column_first_column_name':
    return True
filtered_daily_show = daily_show.filter(lambda line: filter_header(line))
filtered_daily_show.take(5)
  1. Load the data into rdd
  2. Create another rdd with the reference of first rdd by filtering head(As RDD in Spark is immutable)
  3. Execute transformation by invoking action
dataninsight
  • 1,069
  • 6
  • 13
-3
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
RockSolid
  • 488
  • 1
  • 4
  • 12
-3

For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn is df function. So below will not work in RDD style as used above.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
kartik
  • 2,097
  • 3
  • 21
  • 31