126

I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

I would expect this call to give me a list of the two first columns of my file but I'm getting this error :

File "", line 1, in IndexError: list index out of range

although my CSV file as more than one column.

jdhao
  • 24,001
  • 18
  • 134
  • 273
Kernael
  • 3,270
  • 4
  • 22
  • 42

13 Answers13

209

Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

or

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns - integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)
Jaroslav Bezděk
  • 6,967
  • 6
  • 29
  • 46
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 6
    If you do this, don't forget to include the databricks csv package when you open the pyspark shell or use spark-submit. For example, `pyspark --packages com.databricks:spark-csv_2.11:1.4.0` (make sure to change the databricks/spark versions to the ones you have installed). – Galen Long Apr 22 '16 at 19:54
  • Is it csvContext or sqlContext in pyspark? Because in scala you need csvContext – Geoffrey Anderson May 24 '18 at 14:40
  • @zero323 I spent four plus hours trying to get spark to read csv coumns in numeric type but they would all be null. Until I tried your suggestion - .option("inferschema", "true"). Thank you! Not sure why spark is not able to reach explicit schema, even when it looks correct. – user2441441 Nov 06 '20 at 16:28
  • @GalenLong I cant find a `spar-csv` package which supports `scala - 2.12` we are upgrading our code with `Scala -2.12.12 Spark - 3.0.1` and we are facing issue with `_corrupt_record` not being there when the time of trying to get count, but actually it's there I can see that column in the DataFrame, – Rookie007 Nov 12 '21 at 04:33
70

Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatively, you could print the culprit (if any):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
030
  • 10,842
  • 12
  • 78
  • 123
G Quintana
  • 4,556
  • 1
  • 22
  • 23
40
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())
Simon
  • 5,464
  • 6
  • 49
  • 85
y durga prasad
  • 1,184
  • 8
  • 11
22

And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

For example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
JP Mercier
  • 377
  • 2
  • 4
  • 12
    Why would OP would like to do on spark if he is able to load data in pandas – WoodChopper Nov 14 '15 at 14:57
  • Not wanting to install or specify dependencies on every spark cluster.... – SummerEla Jun 15 '16 at 00:59
  • Panda allows file chunking when reading so there is still a use-case here for having Pandas handle initial file parsing. See my answer below for code. – abby sobh Oct 06 '16 at 18:07
  • Caution: Pandas also handles column schema way differently than spark especially when there are blanks involved. Safer to just load csv in as strings for each column. – AntiPawn79 Aug 02 '17 at 22:17
  • @WoodChopper You can use Pandas as a UDF in Spark, no? – flow2k Jun 22 '19 at 18:50
19

Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c), so it's not recommended. zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (make sure not to modify header before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.

Community
  • 1
  • 1
Galen Long
  • 3,693
  • 1
  • 25
  • 37
  • 1
    You don't need Hive to use DataFrames. Regarding your solution: a) There is no need for `StringIO`. `csv` can use any iterable b) `__next__` shouldn't be used directly and will fail on empty line. Take a look at flatMap c) It would be much more efficient to use `mapPartitions` instead of initializing reader on each line :) – zero323 Apr 22 '16 at 05:21
  • Thanks so much for the corrections! Before I edit my answer, I want to make sure I understand fully. 1) Why does `rdd.mapPartitions(lambda x: csv.reader(x))` work while `rdd.map(lambda x: csv.reader(x))` throws an error? I expected both to throw the same `TypeError: can't pickle _csv.reader objects`. It also seems like `mapPartitions` automatically calls some equivalent to "readlines" on the `csv.reader` object, where with `map`, I needed to call `__next__` explicitly to get the lists out of the `csv.reader`. 2) Where does `flatMap` come in? Just calling `mapPartitions` alone worked for me. – Galen Long Apr 22 '16 at 19:11
  • 1
    `rdd.mapPartitions(lambda x: csv.reader(x))` works because `mapPartitions` expects an `Iterable` object. If you want to be explicit you could you comprehension or generator expression. `map` alone doesn't work because it doesn't iterate over object. Hence my suggestion to use `flatMap(lambda x: csv.reader([x]))` which will iterate over the reader. But `mapPartitions` is much better here. – zero323 Apr 23 '16 at 04:30
  • 1
    note that this will read header as a row of data, not as header – muon Feb 13 '17 at 19:30
10

This is in PYSPARK

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Then you can check

df.show(5)
df.count()
Reins
  • 1,109
  • 1
  • 17
  • 35
amarnath pimple
  • 135
  • 1
  • 6
7

If you want to load csv as a dataframe then you can do the following:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

It worked fine for me.

Jeril
  • 7,858
  • 3
  • 52
  • 69
6

Now, there's also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csv as follows:

Assume we have the following context

sc = SparkContext
sqlCtx = SQLContext or HiveContext

First, distribute pyspark-csv.py to executors using SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Read csv data via SparkContext and convert it to DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
optimist
  • 1,018
  • 13
  • 26
6

This is in-line with what JP Mercier initially suggested about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Community
  • 1
  • 1
abby sobh
  • 1,574
  • 19
  • 15
4

If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile() and parse it

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
iec2011007
  • 1,828
  • 3
  • 24
  • 38
4

If you are having any one or more row(s) with less or more number of columns than 2 in the dataset then this error may arise.

I am also new to Pyspark and trying to read CSV file. Following code worked for me:

In this code I am using dataset from kaggle the link is: https://www.kaggle.com/carrie1/ecommerce-data

1. Without mentioning the schema:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Now check the columns: sdfData.columns

Output will be:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Check the datatype for each column:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

This will give the data frame with all the columns with datatype as StringType

2. With schema: If you know the schema or want to change the datatype of any column in the above table then use this (let's say I am having following columns and want them in a particular data type for each of them)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Now check the schema for datatype of each column:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Edited: We can use the following line of code as well without mentioning schema explicitly:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

The output is:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

The output will look like this:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
Yogesh Awdhut Gadade
  • 2,498
  • 24
  • 19
4

When using spark.read.csv, I find that using the options escape='"' and multiLine=True provide the most consistent solution to the CSV standard, and in my experience works the best with CSV files exported from Google Sheets.

That is,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)
Community
  • 1
  • 1
flow2k
  • 3,999
  • 40
  • 55
  • where is the spark come from? is it `import pyspark as spark`? – Luk Aron Nov 20 '19 at 13:22
  • @LukAron In a pyspark shell, `spark` is already initialized. In a script submitted by `spark-submit`, you can instantiate it as `from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()`. – flow2k Nov 21 '19 at 01:02
  • Yes, escape is very important. There is another related option quote='"' which makes things confusing but don't forget escape – demongolem Nov 18 '20 at 14:35
-1

read your csv file in such the way:

df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)

spark version is 3.0.1

Ray
  • 73
  • 4