6

dataFrame.coalesce(1).write().save("path") sometimes writes only _SUCCESS and ._SUCCESS.crc files without an expected *.csv.gz even on non-empty input DataFrame

file save code:

private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
    dataFrame.coalesce(1)
            .write()
            .format("csv")
            .option("header", "true")
            .option("delimiter", "\t")
            .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
            .mode(SaveMode.Overwrite)
            .save("file:///" + directory);
}

file get code:

static Path getTemporaryCsvFile(Path directory) throws IOException {
    String glob = "*.csv.gz";
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
        return stream.iterator().next();
    } catch (NoSuchElementException e) {
        throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
    }
}

file get error example:

java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
    /tmp/temp5889805853850415940/_SUCCESS, 
    /tmp/temp5889805853850415940/._SUCCESS.crc

I rely on this expectation, can someone explain me why it work this way?

Makrushin Evgenii
  • 953
  • 2
  • 9
  • 20
  • 1
    Pls feel free to exotic guesses =) I will be glad to read it too, not just complete solutions and full explonation to my problem – Makrushin Evgenii Oct 16 '19 at 17:59
  • 1
    Do you know if the output file contains records? The movement from ._Success to _Success at least is showing that the write action (logistically-speaking) builds the temp files and moved them to their final resting place. – afeldman Oct 16 '19 at 18:16
  • Output file should (must by logic) contain at least the header line and some data lines. But he does not exist at all – Makrushin Evgenii Oct 17 '19 at 06:42
  • 1
    During your write statement also include a println("#### Number of output records: " + inputDF.count + " ####"). On the output log run this 'cat output.log | grep ####' . This will be able to tell us if the problem is that your inputDF you are trying to write is empty or not. – afeldman Oct 17 '19 at 16:32
  • @afeldman inputDf cant be empty by-design. I omitted some details for brevity. You can read full class implementation at https://gist.github.com/banyrule/bc8b593de9fcb3fd911742970a890db2 – Makrushin Evgenii Oct 18 '19 at 09:08
  • writeCsvToTemporaryDirectory method prohibits writing empty dataftames via `.format("csv").save`. – Makrushin Evgenii Oct 18 '19 at 09:11
  • can we do `dataFrame.coalesce(1).limit(1).collect` to eliminate dataframe emptiness as a cause? – DaRkMaN Oct 22 '19 at 06:02
  • @BanyRule In what `deploy-mode` are you running your Spark app? – mazaneicha Oct 23 '19 at 22:22
  • In case you are running this in cluster mode, then the local file output path would mean local path on each and every executor node. Can this be the problem? – Vihit Shah Oct 24 '19 at 03:00
  • @VihitShah In OP case, it will be local file system of **one executor** that dataframe is being coalesced to. – mazaneicha Oct 24 '19 at 12:43

3 Answers3

5

Output file should (must by logic) contain at least the header line and some data lines. But he does not exist at all

This comment was a bit misleading. According to the code on Github, this will happen only if the Dataframe is empty, and won't produce SUCCESS files. Considering that those files are present - Dataframe is not empty and the writeCsvToDirectory from your code is triggered.

I have a couple of questions:

  • Does your Spark job finish without errors?
  • Does the timestamp of SUCCESS file gets updated?

My two main suspects are:

  1. coalesce(1) - if you have a lot of data, this might fail
  2. SaveMode.Overwrite - I have a feeling that those SUCCESS files are in that folder from previous runs
Artem Vovsia
  • 1,520
  • 9
  • 15
  • 1
    > if you have a lot of data, this might fail Result file weighs about 0.5GB . > I have a feeling that those SUCCESS files are in that folder from previous runs This is obvious, but I have not noticed for so long. Thanks so much for this comment – Makrushin Evgenii Oct 24 '19 at 10:28
  • Glad to help. 0.5 Gb should work fine with coalesce, unless you have a really small RAM. – Artem Vovsia Oct 24 '19 at 11:25
3

It is depend on your storage that you choose to write your csv file. if you write on hdfs everything's ok. but whenever you decide to write in your local files system you must care that nothing will be written in driver local files system and your data will be in worker's files system and you should find them in worker's storage.

two solution's:

  1. Run Spark in Local Mode

set mater local[NUMBER_OF_CORES] that you can submit your job by --master local[10] config

  1. Write In Distributed File System

write your data in distributed file system like s3,hdfs,...

Hamid
  • 221
  • 2
  • 5
1

My own solution solved this problem.

I replace .save("file://" with hadoopFileSystem.copyToLocalFile

The thing is .save("file:// works expectedly only with SparkSession.builder().master("local"), where hdfs:// is emulated by master's file://.

I may be wrong in theory, but it works.

static Path writeCsvToTemporaryDirectory(Dataset<Row> dataFrame) throws IOException {
    String temporaryDirectoryName = getTemporaryDirectoryName();

    writeCsvToDirectory(dataFrame, temporaryDirectoryName, sparkContext);

    return Paths.get(temporaryDirectoryName);
}

static void writeCsvToDirectory(Dataset<Row> dataFrame, String directory) throws IOException {
    dataFrame.coalesce(1)
        .write()
        .option("header", "true")
        .option("delimiter", "\t")
        .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
        .mode(SaveMode.Overwrite)
        .csv(directory);

    FileSystem hadoopFileSystem = FileSystem.get(sparkContext.hadoopConfiguration());

    hadoopFileSystem.copyToLocalFile(true,
        new org.apache.hadoop.fs.Path(directory),
        new org.apache.hadoop.fs.Path(directory));
}

static Path getTemporaryCsvFile(Path directory) throws IOException {
    String glob = "*.csv.gz";

    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
        return stream.iterator().next();
    } catch (NoSuchElementException e) {
        throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
    }
}

Path temporaryDirectory = writeCsvToTemporaryDirectory(dataFrame);
Path temporaryFile = DataFrameIOUtils.getTemporaryCsvFile(temporaryDirectory);

try {
    return otherStorage.upload(temporaryFile, name, fields).join();
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    removeTemporaryDirectory(temporaryDirectory);
}
Makrushin Evgenii
  • 953
  • 2
  • 9
  • 20