7

I am running spark job in a cluster which has 2 worker nodes! I am using the code below (spark java) for saving the computed dataframe as csv to worker nodes.

dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath); I am trying to understand how spark writes multiple part files on each worker node.

Run1) worker1 has part files and SUCCESS ; worker2 has _temporarty/task*/part* each task has the part files run.

Run2) worker1 has part files and also _temporary directory; worker2 has multiple part files

Can anyone help me understand why is this behavior? 1)Should I consider the records in outputDir/_temporary as part of the output file along with the part files in outputDir?

2)Is _temporary dir supposed to be deleted after job run and move the part files to outputDir?

3)why can't it create part files directly under ouput dir?

coalesce(1) and repartition(1) cannot be the option since the outputDir file itself will be around 500GB

Spark 2.0.2. 2.1.3 and Java 8, no HDFS

Omkar Puttagunta
  • 4,036
  • 3
  • 22
  • 35
  • Are you saving to a distributed filesystem? Or directly on each worker node? You can take a look at this question, could be related: https://stackoverflow.com/questions/51603404/saving-dataframe-to-local-file-system-results-in-empty-results – Shaido Sep 07 '18 at 07:39
  • This is just Q 51603404; Spark needs a common filesystem across all workers, otherwise things are broken. Set up a shared store somehow, even if just NFS – stevel Sep 10 '18 at 14:11

3 Answers3

6

After analysis, observed that my spark job is using fileoutputcommitter version 1 which is default. Then I included config to use fileoutputcommitter version 2 instead of version 1 and tested in 10 node spark standalone cluster in AWS. All part-* files are generated directly under outputDirPath specified in the dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)

We can set the property

  1. By including the same as --conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2' in spark-submit command

  2. or set the property using sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")

I understand the consequence in case of failures as outlined in the spark docs, but I achieved the desired result!

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, defaultValue is 1
The file output committer algorithm version, valid algorithm version number: 1 or 2. Version 2 may have better performance, but version 1 may handle failures better in certain situations, as per MAPREDUCE-4815.

Omkar Puttagunta
  • 4,036
  • 3
  • 22
  • 35
4

TL;DR To properly write (or read for that matter) data using file system based source you'll need a shared storage.

_temporary directory is a part of basic commit mechanism used by Spark - data is first written to a temporary directory, and once all task finished, atomically moved to the final destination. You can read more about this process in Spark _temporary creation reason

For this process to be successful you need a shared file system (HDFS, NFS, and so on) or equivalent distributed storage (like S3). Since you don't have one, failure to clean temporary state is expected - Saving dataframe to local file system results in empty results.

The behavior you observed (data partially committed and partially not) can occur, when some executors are co-located with the driver and share file system with the driver, enabling full commit for the subset of data.

zero323
  • 322,348
  • 103
  • 959
  • 935
2

Multiple part files are based on your dataframe partition. The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data.

you can control it by using coalesce or repartition. you can reduce the partition or increase it.

if you make coalesce of 1, then you wont see multiple part files in it but this affects writing Data in Parallel.

[outputDirPath = /tmp/multiple.csv ]

dataframe
 .coalesce(1)
 .write.option("header","false")
 .mode(SaveMode.Overwrite)
 .csv(outputDirPath);

on your question on how to refer it..

refer as /tmp/multiple.csv for all below parts.

/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
Karthick
  • 662
  • 5
  • 14