0

My next question is not new but I want to understand how to make it step by step.

In Spark application I create DataFrame. Lets call it df. Version of Spark: 2.4.0

val df: DataFrame  = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

How create .csv file from this DataFrame and put csv file to specific folder in server?

For example is this code correct? I notice that some people use coalesce or repartition for this task. But I don't understand which one would be better in my case.

union.write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/home/reports/")

When I try to use next code it raise ERROR:

org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x 

I run Spark application as root user. reports folder created by root user with next command:

mkdir -m 777 reports

It seems like only hdfs user can write file.

Nurzhan Nogerbek
  • 4,806
  • 16
  • 87
  • 193
  • 1
    Possible duplicate of [Write single CSV file using spark-csv](https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) – Raphael Roth Jan 19 '19 at 19:49
  • Hello @RaphaelRoth I also fould that post but there was a lot of answers. What answer you can advice as best practice? – Nurzhan Nogerbek Jan 19 '19 at 19:58
  • Yes it is correct. However, the use of `repartition` or `coalesce` its commonly used in toy examples o in development to turn all the data in a single file _(this is usually a bad practice in production, because that brokens the distributed nature of **Spark**)_. Second, if you want to save your file in a local filesystem you need to prepend `file://` to your path. Finally be aware that your file will not be named `daily_report.csv`, but instead `part-0001.csv` inside a `daily_report.csv` folder. Finally I hope this is only for testing/learning, **Spark** is not intended for local work. – Luis Miguel Mejía Suárez Jan 19 '19 at 20:04
  • @LuisMiguelMejíaSuárez hello! :) Thank you for your answer. I run Spark application in remote CentOS server and want to create csv file in the same server. As I understand you don't recommend to create csv file with such code, right? Also, how to set name of the future csv file? – Nurzhan Nogerbek Jan 20 '19 at 05:32
  • @LuisMiguelMejíaSuárez when I tried to test this code it raise error. I set just `.save("/home/reports")`. The owner of the `reports` folder is `root` user who has all rights (read, write, delete). *ERROR*: `org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x` – Nurzhan Nogerbek Jan 20 '19 at 06:33
  • @RaphaelRoth what `.format("com.databricks.spark.csv")` actually mean? I need to create load some dependencies in sbt? – Nurzhan Nogerbek Jan 20 '19 at 07:01
  • @NurzhanNogerbek this old (from spark 1.6), now you can just use `df.write.csv(filename)` – Raphael Roth Jan 20 '19 at 07:32
  • @RaphaelRoth I tried it but unfortunately it raise error. *Code*: `df.coalesce(1).write.csv("/home/reports/" + fileName)`. *Error*: `rg.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x`. The owner of the `reports` folder is `root` user who has all rights (read, write, delete). It seems like only `hdfs` user can write permission, right? What can you advice? – Nurzhan Nogerbek Jan 20 '19 at 07:43
  • 2
    In Spark 2.* you can use syntax `dataFrame.option("header", "true").csv("/home/reports/")`. But you should not mix up local file system and HDFS. The path you specified is path on HDFS. Your user don't have permissions for writing to that location. If you want to write to a local directory on a server you should collect the data first using `val dataToSave = dataFrame.collect()`. If you do that, all the data related to the DataFrame will go to the Spark master node. So, make sure you have enough memory for that. After that you will be able to save you data using standard Scala/Java IO API. – Dmitry Y. Jan 20 '19 at 08:34
  • @DmitryY. hello! Thank you for your answer. I notice that after `.collect()` method `dataToSave` variable has such `[Lorg.apache.spark.sql.Row;@31533eb1` value. After that step I can't use `dataToSave.write.csv(path)`, right? So how I can create csv file from that `dataToSave` variable? Can you show me little code example please. I am confused! – Nurzhan Nogerbek Jan 20 '19 at 09:14
  • 1
    No, that is not what I mean. You can use `dataToSave.write.csv(path)` for saving data to HDFS or S3. If you want to save your DataFrame to some location on a server. You should do `val dataToSave = dataFrame.collect()`. After that the data is contained in memory on your Spark master node. Then you can use `PrintWriter` or `FileWriter` to save the data. – Dmitry Y. Jan 20 '19 at 09:31
  • Ok, I will try test `FileWriter`. I also notice that if I use `dataFrame.write.option("header", "true").csv("file:///home/reports/" + fileName)` it create file called `_SUCCESS` on server. In the same time file don't have any extension. Also I notice that the size of that file is 0KB. So confusing... – Nurzhan Nogerbek Jan 20 '19 at 10:17
  • 2
    If you decide to use `collect` method on the DataFrame, you will get `Array[Row]`. After that you can do `val linesToSave: Array[String] = dataToSave.map(_.toSeq.mkString(";"))`. Also, you can check `au.com.bytecode.opencsv.CSVWriter` for creating csv files from `Array[Row]`. – Dmitry Y. Jan 20 '19 at 10:32
  • 2
    Actually, the simplest way is to use approach which is described in the page from the first comment. You can write your file to some temporary location on HDFS (e.g.: /tmp) and after the job is complete just copy/move it to a local directory. – Dmitry Y. Jan 20 '19 at 10:36
  • @DmitryY. Thank you for information! I put csv file in `/tmp` folder as you adviced. Is it possible to change the name of csv file? Also I notice that all records in one colomn. Thats not what I expected ;( Also I found this article: https://dzone.com/articles/csv-file-writer-using-scala. It looks nice but the code described in step 10 looks strange for me. What do you think? – Nurzhan Nogerbek Jan 20 '19 at 12:43

1 Answers1

2

I believe you are confused about the way Spark behaves, I would recommend you to read the official documentation and / or some tutorial first.
Nevertheless, I hope this answers your question.

This code will save a DataFrame as a SINGLE CSV File on a local filesystem.
It was tested with Spark 2.4.0 with Scala 2.12.8 on an Ubuntu 18.04 laptop.

import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .appName("CSV Writter Test")
    .getOrCreate()
import spark.implicits._

val df =
  Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
  ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.printSchema
// root
//  |-- NAME: string (nullable = true)
//  |-- START_DATE: string (nullable = true)
//  |-- END_DATE: string (nullable = true)
//  |-- STATUS: string (nullable = true)

df.coalesce(numPartitions = 1)
  .write
  .option(key = "header", value = "true")
  .option(key = "sep", value = ",")
  .option(key = "encoding", value = "UTF-8")
  .option(key = "compresion", value = "none")
  .mode(saveMode = "OVERWRITE")
  .csv(path = "file:///home/balmungsan/dailyReport/") // Change the path. Note there are 3 /, the first two are for the file protocol, the third one is for the root folder.

spark.stop()

Now, let's check the saved file.

balmungsan@BalmungSan:dailyReport $ pwd
/home/balmungsan/dailyReport

balmungsan@BalmungSan:dailyReport $ ls
part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv  _SUCCESS

balmungsan@BalmungSan:dailyReport $ cat part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv 
NAME,START_DATE,END_DATE,STATUS
Alex,2018-01-01 00:00:00,2018-02-01 00:00:00,OUT
Bob,2018-02-01 00:00:00,2018-02-05 00:00:00,IN
Mark,2018-02-01 00:00:00,2018-03-01 00:00:00,IN
Mark,2018-05-01 00:00:00,2018-08-01 00:00:00,OUT
Meggy,2018-02-01 00:00:00,2018-02-01 00:00:00,OUT

The _SUCCESS file exists to signal that the writing succeed.

Important notes:

  • You need to specify the file:// protocol to save to a local filesystem, instead of in HDFS.
  • The path specifies the name of the folder to save the partitions of the file, not the name of the file, inside that folder there will one file per partition. If you want to read such file again with Spark, then you only need to specify the folder, Spark will understand the partition files. If not, I would recommend rename the file after - as far as I know, there is no way to control the name from Spark.
  • If the df is too big to fit in the memory of just one node, the job will fail.
  • If you run this on a distributed way (e.g. with master yarn), then the file will not be saved in the master node, but in one of the slave nodes. If you really need it to be in the master node, then you may collect it and write it with normal Scala as Dmitry suggested.
  • Hello! Thank for your detailed answer. It's really a problem that Spark can't change the name of csv file. In the future DataFrame can be big. So you advice me use more memory or more nodes, right? Could you describe your last step about collection more detaily please? Dmitry adviced to use `au.com.bytecode.opencsv.CSVWriter` library to create csv file from `Array[Row]` but I still can't find any good example. – Nurzhan Nogerbek Jan 20 '19 at 19:12
  • 2
    If the `DataFrame` is really big I would suggest using more **nodes** _(Horizontal Scaling is always better than Vertical)_. Again, if the `DataFrame` is big, then `collect` may just crash the App, also it even does not make sense to write to a single file. _(unless the final result is just a small report)_. The reason why Spark cannot change the name, is because, as stated above since there will usually be multiple _part_ files, then the folder becomes the file identifier in the distributed filesystem. If you want, you can invite me to a SO chat to discuss more about your use case. – Luis Miguel Mejía Suárez Jan 20 '19 at 19:21