1

I'm having a CSV file which I want to read into an RDD or DataFrame. This is working so far, but if I collect the data and convert it into a pandas DataFrame for plotting the table is "malformed".

Here is how I read the CSV file:

NUMERIC_DATA_FILE = os.path.join(DATA_DIR, "train_numeric.csv")
numeric_rdd = sc.textFile(NUMERIC_DATA_FILE)
numeric_rdd = numeric_rdd.mapPartitions(lambda x: csv.reader(x, delimiter=","))
numeric_df = sqlContext.createDataFrame(numeric_rdd)
numeric_df.registerTempTable("numeric")

The result looks like this:

enter image description here

Is there an easy way to correctly set the first row of the CSV data to columns and the first column as index?


This problem goes further as I try to select data from the DataFrame:

numeric_df.select("SELECT Id FROM numeric")

which gives me:

AnalysisException: u"cannot resolve 'SELECT Id FROM numeric' given input columns _799, _640, _963, _70, _364, _143, _167, 
_156, _553, _835, _780, _235, ...
Shivam Gaur
  • 1,032
  • 10
  • 17
Stefan Falk
  • 23,898
  • 50
  • 191
  • 378
  • Try to read your CSV file [properly](http://stackoverflow.com/a/34528938/5741205). Alternatively you can read your CSV file directly to Pandas DF, using `pd.read_csv(...)` if it fits in your RAM – MaxU - stand with Ukraine Oct 12 '16 at 15:56
  • This looks like data from the **Bosch Production Line Performance** (https://www.kaggle.com/c/bosch-production-line-performance/data) kaggle competition. You need lots of RAM for the data to fit in... – Shivam Gaur Oct 12 '16 at 16:57
  • @ShivamGaur 1,8TB should do it :P – Stefan Falk Oct 12 '16 at 17:33

1 Answers1

0

Setting a schema for your PySpark DataFrame

Your PySpark DataFrame does not have a schema assigned to it. You should replace your code with the snippet below:

from pyspark.sql.types import *
NUMERIC_DATA_FILE = sc.textFile(os.path.join(DATA_DIR, "train_numeric.csv"))

# Extract the header line
header = NUMERIC_DATA_FILE.first()

# Assuming that all the columns are numeric, let's create a new StructField for each column
fields = [StructField(field_name, FloatType(), True) for field_name in header]

Now, we can construct our schema,

schema = StructType(fields)

# We have the remove the header from the textfile rdd

# Extracting the header (first line) from the RDD
dataHeader = NUMERIC_DATA_FILE.filter(lambda x: "Id" in x)

# Extract the data without headers. We can make use of the `subtract` function
dataNoHeader = NUMERIC_DATA_FILE.subtract(dataHeader)

numeric_temp_rdd = dataNoHeader.mapPartitions(lambda x: csv.reader(x, delimiter=","))

The Schema is passed in as a parameter into the createDataFrame() function

numeric_df = sqlContext.createDataFrame(numeric_temp_rdd,schema)
numeric_df.registerTempTable("numeric")

Now, if you wish to convert this DataFrame to a Pandas dataframe, use the toPandas() function:

pandas_df = numeric_df.limit(5).toPandas()

The following statement will work as well:

numeric_df.select("Id")

If you want to use pure SQL, you need to use SQLContext to query your table

from pyspark.sql import SQLContext
sqlContext  =  SQLContext(sc)
sqlContext.sql('SELECT Id from numeric')
Shivam Gaur
  • 1,032
  • 10
  • 17
  • Hm, I can't make it run. Still getting the error for the `select()` call and for `numeric_df.take(5)` it's throwing a `TypeError` somewhere "*unhashable type: 'list'*" - I don't know where this is happening. – Stefan Falk Oct 13 '16 at 09:08
  • `numeric_df.select("Id")` should work. Corrected the syntax – Shivam Gaur Oct 13 '16 at 12:07
  • ^ This style of is similar to SQLAlchemy. if you wish to use pure SQL Statements, then you need to use the sql context – Shivam Gaur Oct 13 '16 at 12:08