21

I'm using python on Spark and would like to get a csv into a dataframe.

The documentation for Spark SQL strangely does not provide explanations for CSV as a source.

I have found Spark-CSV, however I have issues with two parts of the documentation:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" Do I really need to add this argument everytime I launch pyspark or spark-submit? It seems very inelegant. Isn't there a way to import it in python rather than redownloading it each time?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") Even if I do the above, this won't work. What does the "source" argument stand for in this line of code? How do I simply load a local file on linux, say "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

Alexis Eggermont
  • 7,665
  • 24
  • 60
  • 93

9 Answers9

33

With more recent versions of Spark (as of, I believe, 1.4) this has become a lot easier. The expression sqlContext.read gives you a DataFrameReader instance, with a .csv() method:

df = sqlContext.read.csv("/path/to/your.csv")

Note that you can also indicate that the csv file has a header by adding the keyword argument header=True to the .csv() call. A handful of other options are available, and described in the link above.

mattsilver
  • 4,386
  • 5
  • 23
  • 37
  • When I called this command I received the error, `AttributeError: 'property' object has no attribute 'csv'`. Actually, I missed the all important `sqlContext = SQLContext(sc)` - works fine now –  Oct 25 '19 at 15:11
22
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
               .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()
Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
  • This answer has several upvotes but it's not exactly clear to me what's going on - you make SQLContext(sc) and call that sqlContext, then you don't do anything with it.. is that just extraneous code? When I try the same code with a simple csv file in a zeppelin notebook I get the error: ```Traceback (most recent call last): File "/tmp/zeppelin_pyspark-7664300769638364279.py", line 252, in eval(compiledCode) File "", line 1, in AttributeError: 'int' object has no attribute 'map'``` – tamale Oct 14 '16 at 22:35
  • Please share your code, to get help. I have used the code in several instances , I have not had issues – Aravind Krishnakumar Oct 16 '16 at 16:50
  • 4
    You did not show what `sc` IS. If it's a SparkContext() then you should show the assignment in your code example. – Jarad Apr 26 '18 at 20:12
16

for Pyspark, assuming that the first row of the csv file contains a header

spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
Grant Shannon
  • 4,709
  • 1
  • 46
  • 36
15

Read the csv file in to a RDD and then generate a RowRDD from the original RDD.

Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

source: SPARK PROGRAMMING GUIDE

None
  • 1,448
  • 1
  • 18
  • 36
  • 8
    this answer is old, new versions of spark have easier ways to achieve this. Refer to answers https://stackoverflow.com/a/41638342/187355 and https://stackoverflow.com/a/46539901/187355 – Luca Gibelli Jan 12 '18 at 16:56
11

If you do not mind the extra package dependency, you could use Pandas to parse the CSV file. It handles internal commas just fine.

Dependencies:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

Read the whole file at once into a Spark DataFrame:

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df)

Or, even more data-consciously, you can chunk the data into a Spark RDD then DF:

chunk_100k = pd.read_csv('file.csv', chunksize=100000)

for chunky in chunk_100k:
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
    try:
        Spark_full_rdd += Spark_temp_rdd
    except NameError:
        Spark_full_rdd = Spark_temp_rdd
    del Spark_temp_rdd

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
abby sobh
  • 1,574
  • 19
  • 15
  • createDataFrame often gives and error like this: IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':" ... any experience hitting this? – mathtick Mar 16 '17 at 08:39
7

Following Spark 2.0, it is recommended to use a Spark Session:

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession \
    .builder \
    .appName("basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))

lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
Florent
  • 93
  • 2
  • 8
0

I ran into similar problem. The solution is to add an environment variable named as "PYSPARK_SUBMIT_ARGS" and set its value to "--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell". This works with Spark's Python interactive shell.

Make sure you match the version of spark-csv with the version of Scala installed. With Scala 2.11, it is spark-csv_2.11 and with Scala 2.10 or 2.10.5 it is spark-csv_2.10.

Hope it works.

mahima
  • 1,875
  • 1
  • 11
  • 15
0

Based on the answer by Aravind, but much shorter, e.g. :

lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])
JARS
  • 1,109
  • 7
  • 10
0

With the current implementation(spark 2.X) you dont need to add the packages argument, You can use the inbuilt csv implementation

Additionally as the accepted answer you dont need to create an rdd then enforce schema that has 1 potential problem

When you read the csv as then it will mark all the fields as string and when you enforce the schema with an integer column you will get exception.

A better way to do the above would be

 spark.read.format("csv").schema(schema).option("header", "true").load(input_path).show() 
iec2011007
  • 1,828
  • 3
  • 24
  • 38