10

I'm trying to append data to my csv file using df.write.csv. This is what I did after following spark document http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter:

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append'

Executing the above code gives me error:

NameError: name 'append' not defined

Without append, error:

The path already exists.

kavya
  • 759
  • 4
  • 14
  • 31

3 Answers3

14
df.write.save(path='csv', format='csv', mode='append', sep='\t')
Zhang Tong
  • 4,569
  • 3
  • 19
  • 38
  • This again splits output into different files. It's gets partitioned. – kavya Dec 19 '16 at 09:52
  • 7
    Include `.coalesce(1)` before write, it will prevent partitioning, not sure if the result will be appended though! `df.coalesce(1).write.save(path='csv', format='csv', mode='append', sep='\t')` – Jarek Dec 19 '16 at 12:07
  • Thanks. That got everything to one file. – kavya Dec 20 '16 at 07:20
3

From the docs: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter Since v1.4

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

e.g.

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

If you want to write a single file, you can use coalesce or repartition on either of those lines. It doesn't matter which line, because the dataframe is just a DAG execution , no execution happens until the write to csv. repartition & coalesce effectively use the same code, but coalesce can only reduce the number of partitions where repartition can also increase them. I'd just stick to repartition for simplicity.

e.g.

df1 = sqlContext.createDataFrame(query1).repartition(1)

or

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

I think the examples in the docs aren't great, they don't show examples of using parameters other than the path.

Referring to the two things you tried:

(append)

For that to work, there would need to be a string variable named append containing the value "append". There's no string constant in the DataFrameWriter library called append. i.e. you could add this earlier in your code, and it would then work. append = "append"

('mode=append')

For that to work, the csv method would have to parse out the mode=append string to get the value for the mode, which would be extra work when you can just have a parameter with exactly the value "append" or "overwrite" that needs to be extracted. None is a special case, Python built in, not specific to pyspark.

On another note, I recommend using named parameters where possible. e.g.

csv(path="/path/to/file.csv", mode="append")

instead of positional parameters

csv("/path/to/file.csv", "append")

It's clearer, and helps comprehension.

Davos
  • 5,066
  • 42
  • 66
1

I do not about Python, but in Scala and Java one can set the the save mode in the following way:

df.write.mode("append").csv("pathToFile")

I assume that it should be similar in Python. This may be helpful.

Community
  • 1
  • 1
Anton Okolnychyi
  • 936
  • 7
  • 10
  • I tried what you said in python. But each line of my output is copied into separate csv files in one folder called `sqlcsvA.csv`. They are not copied into one single csv file. – kavya Dec 19 '16 at 09:00
  • 1
    @kaks, it seems like you will have to merge those files manually. Take a look at this [question](http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv). For instance, people are using [FileUtil.copyMerge](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs.FileSystem,%20org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.FileSystem,%20org.apache.hadoop.fs.Path,%20boolean,%20org.apache.hadoop.conf.Configuration,%20java.lang.String)) in Java. – Anton Okolnychyi Dec 19 '16 at 09:19
  • @kaks, note that if you read the results back (in Spark), those files are merged and you have a DataFrame that contains the data from all files in that directory. – Anton Okolnychyi Dec 19 '16 at 09:24
  • 2
    Don't need to merge them manually, just use `.repartition(1)` when writing. When you read the files back in to a dataframe, it doesn't technically merge them, because the dataframe is distributed in the cluster. Each file will be the basis of a dataframe partition. So in a sense you do have one dataframe, but it is still in many underlying pieces. – Davos Sep 04 '17 at 00:15