1

I am reading csv file through Spark using the following.

rdd=sc.textFile("emails.csv").map(lambda line: line.split(","))

I need to create a Spark DataFrame.

I have converted this rdd to spark df by using the following:

dataframe=rdd.toDF()

But I need to specify the schema of the df while converting the rdd to df. I tried doing this: (I just have 2 columns-file and message)

from pyspark import Row

email_schema=Row('file','message')

email_rdd=rdd.map(lambda r: email_schema(*r))

dataframe=sqlContext.createDataFrame(email_rdd)

However, I am getting the error: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 1 values are provided.

I also tried reading my csv file using this:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

I get the error: TypeError: 'list' object is not callable

I tried using pandas to read my csv file into a pandas data frame and then converted it to spark DataFrame but my file is too huge for this.

I also added :

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3

And read my file using the following:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv')

I am getting the error: java.io.IOException: (startline 1) EOF reached before encapsulated token finished

I have gone through several other related threads and tried as above. Could anyone please explain where am I going wrong?

[Using Python 2.7, Spark 1.6.2 on MacOSX]

Edited:

1st 3 rows are as below. I need to extract just the contents of the email. How do I go about it?

1 allen-p/_sent_mail/1. "Message-ID: <18782981.1075855378110.JavaMail.evans@thyme> Date: Mon, 14 May 2001 16:39:00 -0700 (PDT) From: phillip.allen@enron.com To: tim.belden@enron.com Subject: Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: Tim Belden X-cc: X-bcc: X-Folder: \Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Sent Mail X-Origin: Allen-P X-FileName: pallen (Non-Privileged).pst

Here is our forecast"

2 allen-p/_sent_mail/10. "Message-ID: <15464986.1075855378456.JavaMail.evans@thyme> Date: Fri, 4 May 2001 13:51:00 -0700 (PDT) From: phillip.allen@enron.com To: john.lavorato@enron.com Subject: Re: Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: John J Lavorato X-cc: X-bcc: X-Folder: \Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Sent Mail X-Origin: Allen-P X-FileName: pallen (Non-Privileged).pst

Traveling to have a business meeting takes the fun out of the trip. Especially if you have to prepare a presentation. I would suggest holding the business plan meetings here then take a trip without any formal business meetings. I would even try and get some honest opinions on whether a trip is even desired or necessary.

As far as the business meetings, I think it would be more productive to try and stimulate discussions across the different groups about what is working and what is not. Too often the presenter speaks and the others are quiet just waiting for their turn. The meetings might be better if held in a round table discussion format.

My suggestion for where to go is Austin. Play golf and rent a ski boat and jet ski's. Flying somewhere takes too much time."

3 allen-p/_sent_mail/100. "Message-ID: <24216240.1075855687451.JavaMail.evans@thyme> Date: Wed, 18 Oct 2000 03:00:00 -0700 (PDT) From: phillip.allen@enron.com To: leah.arsdall@enron.com Subject: Re: test Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: Leah Van Arsdall X-cc: X-bcc: X-Folder: \Phillip_Allen_Dec2000\Notes Folders\'sent mail X-Origin: Allen-P X-FileName: pallen.nsf

test successful. way to go!!!"

tg89
  • 59
  • 1
  • 7

2 Answers2

0

If the RDD will fit in memory, then:

rdd.toPandas().to_csv('emails.csv')

If not, use spark-csv for your version of spark:

rdd.write.format('com.databricks.spark.csv').save('emails.csv')

In your example above:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

don't you want:

rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1]))
Alexander
  • 105,104
  • 32
  • 201
  • 196
  • How do I attach the com.databricks.spark.csv library to spark? I have used the format specified in my post. Do you know where am I going wrong? And, I am trying to read the file. – tg89 Jul 28 '16 at 22:33
  • Thanks @Alexander. I fixed it. But now while reading the file I get java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 1 values are provided. I have edited my post to show my 1st 3 rows. 1st column is file which is just 1, 2 ,3 in bold. And 2nd column is message which contains all message contents. Could you please tell me how can I correct this to incorporate the 2 columns as is? – tg89 Jul 29 '16 at 00:52
0

If you have a huge file, why not use a pandas dataframe in chunks rather than loading all of it at once, something like :

import pandas as pd
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000)

for i,chunk in enumerate(df1):
    if i==0:
        df_spark = sqlContext.createDataFrame(chunk)
    else:
        df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk))

df_spark would be your required spark dataframe. This is inefficient but it would work. For some other methods of implementing the same you can refer answers to this question

Another possible method is to use the inferSchema method of the rdd, but you need to have column names in your csv file for this to work, refer to this. So you can do something like:

srdd = inferSchema(rdd)
email_rdd=rdd.map(lambda r: srdd(*r))

dataframe=sqlContext.createDataFrame(email_rdd)
Community
  • 1
  • 1
Gaurav Dhama
  • 1,346
  • 8
  • 19