1

I am trying to save an pyspark.sql.dataframe.DataFrame in CSV format (could also be another format, as long as it is easily readable).

So far, I found a couple of examples to save the DataFrame. However, it is losing information everytime that I write it.

Dataset example:

# Create an example Pyspark DataFrame

from pyspark.sql import Row

Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )

department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')

departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)

In order to save this file as CSV, I firstly tried this solution:

type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')

Unfortunately, that result in this error:

org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.; 

That is the reason why I tried another possibility, to convert the spark dataframe into a pandas dataframe, and save it then. As mentioned in this example.

pandas_df = dframe.toPandas()

Works good! However, If I show my data, it is missing data:

print(pandas_df.head())

department                                          employees
0   (123, HR)  [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1  (456, OPS)  [(C, None, mail3, 140000), (D, DD, mail4, 1600...

As you can see in the snapshot below, we are missing information. Because the data should be like this:

department              employees
0  id:123, name:HR      firstName: A, lastName: AA, email: mail1, salary: 100000

# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc. 
# For the complete expected example, see screenshow below. 

Expected Data format

Just for information: I am working in Databricks, with Python.

Therefore, how can I write my data (dframe from the example above) without losing information?

Many thanks in advance!

Edit Adding a picture for Pault, to show the format of the csv (and the headers).

Edit2 Replacing the picture for example csv output:

After running Pault's code:

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .repartition(1).write.csv("junk_mycsv.csv", header= True)

The output is not tidy, since most column headers are empty (due the nested format?). Only copying the first row:

department           employees              (empty ColName)     (empty ColName)   (and so on)
{\id\":\"123\"       \"name\":\"HR\"}"     [{\firstName\":\"A\"  \"lastName\":\"AA\"    (...)
R overflow
  • 1,292
  • 2
  • 17
  • 37

1 Answers1

1

Your dataframe has the following schema:

dframe.printSchema()
#root
# |-- department: struct (nullable = true)
# |    |-- id: string (nullable = true)
# |    |-- name: string (nullable = true)
# |-- employees: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- firstName: string (nullable = true)
# |    |    |-- lastName: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- salary: long (nullable = true)

So the department column is a StructType with two named fields and the employees column is an array of structs with four named fields. It appears what you want is to write the data in a format that saves both the key and the value for each record.

One option is to write the file in JSON format instead of CSV:

dframe.write.json("junk.json")

Which produces the following output:

{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}
{"department":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}

Or if you wanted to keep it in CSV format, you can use to_json to convert each column to JSON before writing the CSV.

# looping over all columns
# but you can also just limit this to the columns you want to convert

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .write.csv("junk_mycsv.csv")

This produces the following output:

"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"salary\":160000}]"
"{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"salary\":160000}]"

Note that the double-quotes are escaped.

pault
  • 41,343
  • 15
  • 107
  • 149
  • Appreciated! Some additional Questions: 1). Your JSON solution is resulting in multiple JSON files, can it also produce 1 JSON (containing everything). 2). For the CSV example I receive an error: name "f" not defined. So I changed f with dframe, but that leads to: "DataFrame" object has no attribute 'to_json' – R overflow Apr 02 '20 at 14:36
  • 1
    For 1) See [Write single CSV file using spark-csv](https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) for 2) I missed an import, updated. – pault Apr 02 '20 at 14:39
  • Great! For the CSV file, can we add headers too (the .option("header", "true") failed) to the csv file? If we can add headers, we see some strange things (it looks like the format is not proper). I added a picture in the question to show what I mean (it is not perse a value per column name). – R overflow Apr 02 '20 at 14:56
  • Try: `.write.csv("junk_mycsv.csv", header=True)`. I can't see the picture you posted. [Why not upload images of code on SO when asking a question?](https://meta.stackoverflow.com/questions/285551/why-not-upload-images-of-code-on-so-when-asking-a-question). Personally I prefer to use `write.csv.` without the header, then do run `hadoop fs -cat junk_mycsv.csv/part* path/to/local` and manually add the header on the local file. – pault Apr 02 '20 at 14:58
  • This worked. I edited my post, but I think that I cannot get it in a more proper form (due to the nested format). Thanks Pault! – R overflow Apr 02 '20 at 15:10